介绍
在使用 Flink 1.13.x 集成 CDC 2.3.0 时,如果遇到时间戳作为起点开始采集但没有采集到数据的问题,可能有以下几个原因:
解决方案
时区问题:Flink 运行机器时区和 MySQL Server 时区不匹配,database.serverTimezone 配置会受到影响。解决办法是手动指定 Flink 运行的时区,和连接的数据库时区信息保持一致1。例如,可以在数据库属性中添加 dbProps.put("database.serverTimezone", "UTC");。
包冲突:Flink 为了解决包冲突,对一些通用的工具包做了 shaded1。Flink CDC Connectors 2.3.0 版本引用了 Flink 1.16.0,这个版本的 Flink 使用了 flink-shaded-guava:30.1.1-jre-15.0 版本。而 Flink 1.13.0 使用的是 flink-shaded-guava:18.0-13.0 版本,两个版本的 shaded package 不一样引起的。解决方法是在 CDC 中再次 shaded 一下,让 CDC 里面引用到的 guava30 变为 guava18。
代码示例
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
// 创建 Properties 对象
Properties properties = new Properties();
// 设置时区为 "Asia/Shanghai"
properties.setProperty("database.serverTimezone", "Asia/Shanghai");
MySqlSource mySqlSource = MySqlSource.builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.debeziumProperties(properties)
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
}
}