Flink CDC Mysql 启动模式详解

Flink Sql 语法

我们先来看一下使用 flink sql 的方式如何通过 flink cdc 采集 mysql 数据:

CREATE TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(105),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
WITH (
  'connector' = 'mysql',
  'hostname' = '',
  'port' = '3306',
  'username' = '',
  'password' = '',
  'database-name' = '',
  'table-name' = ''
);

启动模式

从上述例子中,我们可以在WITH属性中配置各种参数,参数比如:数据库的用户名、密码、表名等信息。当然我们也可以设置 flink 任务启动时消费数据的方式(scan.startup.mode),主要有以下五种:

① initial

initial是默认的参数值,在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。

② latest-offset

在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该连接器启动以后的最新变更。

③ earliest-offset

不扫描历史全量数据,直接从可读取的最早Binlog开始读取。

④ specific-offset

不扫描历史全量数据,从您指定的Binlog位点启动,位点可通过配置scan.startup.specific-offset.filescan.startup.specific-offset.pos的方式来指定从特定Binlog文件名和偏移量启动,也可以通过配置scan.startup.specific-offset.gtid-set指定从某个GTID集合启动。

scan.startup.specific-offset.file

使用指定位点模式启动时,启动位点的Binlog文件名。文件名格式例如mysql-bin.000003

scan.startup.specific-offset.pos

使用指定位点模式启动时,启动位点在指定Binlog文件中的偏移量。

scan.startup.specific-offset.gtid-set

使用指定位点模式启动时,启动位点的GTID集合。GTID集合格式例如24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

⑤ timestamp

不扫描历史全量数据,从指定的时间戳开始读取Binlog。时间戳通过scan.startup.timestamp-millis指定,单位为毫秒。

scan.startup.timestamp-millis

使用指定时间模式启动时,启动位点的毫秒时间戳。在使用指定时间时,MySQL CDC会从最早Binlog开始读取,直至Binlog事件的时间戳大于等于指定的时间戳后开始向下游发送数据。因此请保证指定的时间戳对应的Binlog文件在数据库上没有被清理且可以被读取到。

写在最后

同样的,通过 Flink Datastream 的方式也可以实现。

MySqlSource mysqlSource = MySqlSource.builder()
                .hostname("127.0.0.1")
                .port(3306)
                .username("demo")
                .password("123456")
                .databaseList("demo")
                .tableList("table1")
                .startupOptions(StartupOptions.initial())
                .deserializer(new CustomerDeserializationSchemaMysql())
                .build();

通过设置 startupOptions 参数我们就可以指定 Flink CDC 的启动方式了。

请使用浏览器的分享功能分享到微信等