Flink cdc读取mysql数据,需要mysql开启哪些配置

介绍

    Flink CDC 是一个用于从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。它可以实现 Flink SQL 采集+计算+传输(ETL)一体化。

开启配置

  • 开启 binlog 功能,并设置 binlog 格式为 row 模式。binlog 是 MySQL 的二进制日志文件,它记录了数据库中的所有变更操作,Flink CDC 会通过 binlog 来捕获数据的变化。

  • 开启 GTID 功能,并设置 server_id 为唯yi值。GTID 是全局事务标识符,它是一个唯yi的标识符,用于跟踪每个事务在整个复制拓扑中的状态。Flink CDC 会通过 GTID 来保证数据的一致性和容错性。

  • 授予 Flink CDC 连接 MySQL 的用户必要的权限,包括 SELECT, REPLICATION SLAVE, REPLICATION CLIENT, SHOW VIEW 等。这些权限是 Flink CDC 读取数据和元数据所必需的。

参考文章

https://blog.csdn.net/lhyandlwl/article/details/129998737

注意点

cdc导包为:

import com.ververica.cdc.connectors.mysql.source.MySqlSource;

超时报错信息:

java.net.SocketTimeoutException: Read timed out
	at java.net.SocketInputStream.socketRead0(Native Method) ~[?:1.8.0_221]
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[?:1.8.0_221]
	at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[?:1.8.0_221]
	at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_221]
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[?:1.8.0_221]
	at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) ~[?:1.8.0_221]
	at java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[?:1.8.0_221]
	at sun.net.(HttpClient.java:735) ~[?:1.8.0_221]
	at sun.net.(HttpClient.java:678) ~[?:1.8.0_221]
	at sun.net.(HttpURLConnection.java:1593) ~[?:1.8.0_221]
	at sun.net.(HttpURLConnection.java:1498) ~[?:1.8.0_221]
	at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) ~[?:1.8.0_221]
	at io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:315) ~[flink-metrics-prometheus-1.13.2.jar:1.13.2]
	at io.prometheus.client.exporter.PushGateway.push(PushGateway.java:138) ~[flink-metrics-prometheus-1.13.2.jar:1.13.2]
	at org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:63) [flink-metrics-prometheus-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:494) [LoadMySqlFlinkCDC.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_221]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_221]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_221]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_221]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_221]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_221]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]

出现上面的错误,可配置参数

// 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间
props.setProperty("connect.timeout", String.valueOf(10 * 60 * 1000));

checkpoint的间隔也要调整为合适的时间,比如和上面的等待时间一样



请使用浏览器的分享功能分享到微信等