Flink Sql 语法
我们先来看一下使用 flink sql 的方式如何通过 flink cdc 采集 mysql 数据:
CREATE TABLE mysqlcdc_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '' ,
'port' = '3306',
'username' = '' ,
'password' = '' ,
'database-name' = '' ,
'table-name' = ''
);
启动模式
从上述例子中,我们可以在WITH属性中配置各种参数,参数比如:数据库的用户名、密码、表名等信息。当然我们也可以设置 flink 任务启动时消费数据的方式(scan.startup.mode),主要有以下五种:
① initial
initial是默认的参数值,在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。
② latest-offset
在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该连接器启动以后的最新变更。
③ earliest-offset
不扫描历史全量数据,直接从可读取的最早Binlog开始读取。
④ specific-offset
不扫描历史全量数据,从您指定的Binlog位点启动,位点可通过配置scan.startup.specific-offset.file和scan.startup.specific-offset.pos的方式来指定从特定Binlog文件名和偏移量启动,也可以通过配置scan.startup.specific-offset.gtid-set指定从某个GTID集合启动。
scan.startup.specific-offset.file
使用指定位点模式启动时,启动位点的Binlog文件名。文件名格式例如mysql-bin.000003。
scan.startup.specific-offset.pos
使用指定位点模式启动时,启动位点在指定Binlog文件中的偏移量。
scan.startup.specific-offset.gtid-set
使用指定位点模式启动时,启动位点的GTID集合。GTID集合格式例如24DA167-0C0C-11E8-8442-00059A3C7B00:1-19。
⑤ timestamp
不扫描历史全量数据,从指定的时间戳开始读取Binlog。时间戳通过scan.startup.timestamp-millis指定,单位为毫秒。
scan.startup.timestamp-millis
使用指定时间模式启动时,启动位点的毫秒时间戳。在使用指定时间时,MySQL CDC会从最早Binlog开始读取,直至Binlog事件的时间戳大于等于指定的时间戳后开始向下游发送数据。因此请保证指定的时间戳对应的Binlog文件在数据库上没有被清理且可以被读取到。
写在最后
同样的,通过 Flink Datastream 的方式也可以实现。
MySqlSource mysqlSource = MySqlSource.builder()
.hostname("127.0.0.1")
.port(3306)
.username("demo")
.password("123456")
.databaseList("demo")
.tableList("table1")
.startupOptions(StartupOptions.initial())
.deserializer(new CustomerDeserializationSchemaMysql())
.build();
通过设置 startupOptions 参数我们就可以指定 Flink CDC 的启动方式了。