Flink CDC 采集MySQL 初始化或者指定时间戳时,没有采集到数据

介绍

在使用 Flink 1.13.x 集成 CDC 2.3.0 时,如果遇到时间戳作为起点开始采集但没有采集到数据的问题,可能有以下几个原因:


解决方案

时区问题:Flink 运行机器时区和 MySQL Server 时区不匹配,database.serverTimezone 配置会受到影响。解决办法是手动指定 Flink 运行的时区,和连接的数据库时区信息保持一致1。例如,可以在数据库属性中添加 dbProps.put("database.serverTimezone", "UTC");。

包冲突:Flink 为了解决包冲突,对一些通用的工具包做了 shaded1。Flink CDC Connectors 2.3.0 版本引用了 Flink 1.16.0,这个版本的 Flink 使用了 flink-shaded-guava:30.1.1-jre-15.0 版本。而 Flink 1.13.0 使用的是 flink-shaded-guava:18.0-13.0 版本,两个版本的 shaded package 不一样引起的。解决方法是在 CDC 中再次 shaded 一下,让 CDC 里面引用到的 guava30 变为 guava18。


代码示例

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
        // 创建 Properties 对象
        Properties properties = new Properties();
        // 设置时区为 "Asia/Shanghai"
        properties.setProperty("database.serverTimezone", "Asia/Shanghai");
    MySqlSource mySqlSource = MySqlSource.builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
        .tableList("yourDatabaseName.yourTableName") // set captured table
        .username("yourUsername")
        .password("yourPassword")
        .debeziumProperties(properties)
        .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
        .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // enable checkpoint
    env.enableCheckpointing(3000);
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    env.execute("Print MySQL Snapshot + Binlog");
  }
}


请使用浏览器的分享功能分享到微信等