介绍
Flink CDC 是一个用于从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。它可以实现 Flink SQL 采集+计算+传输(ETL)一体化。
开启配置
-
开启 binlog 功能,并设置 binlog 格式为 row 模式。binlog 是 MySQL 的二进制日志文件,它记录了数据库中的所有变更操作,Flink CDC 会通过 binlog 来捕获数据的变化。
-
开启 GTID 功能,并设置 server_id 为唯yi值。GTID 是全局事务标识符,它是一个唯yi的标识符,用于跟踪每个事务在整个复制拓扑中的状态。Flink CDC 会通过 GTID 来保证数据的一致性和容错性。
-
授予 Flink CDC 连接 MySQL 的用户必要的权限,包括 SELECT, REPLICATION SLAVE, REPLICATION CLIENT, SHOW VIEW 等。这些权限是 Flink CDC 读取数据和元数据所必需的。
参考文章
https://blog.csdn.net/lhyandlwl/article/details/129998737
注意点
cdc导包为:
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
超时报错信息:
java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) ~[?:1.8.0_221] at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[?:1.8.0_221] at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[?:1.8.0_221] at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_221] at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[?:1.8.0_221] at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) ~[?:1.8.0_221] at java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[?:1.8.0_221] at sun.net.(HttpClient.java:735) ~[?:1.8.0_221] at sun.net.(HttpClient.java:678) ~[?:1.8.0_221] at sun.net.(HttpURLConnection.java:1593) ~[?:1.8.0_221] at sun.net.(HttpURLConnection.java:1498) ~[?:1.8.0_221] at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) ~[?:1.8.0_221] at io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:315) ~[flink-metrics-prometheus-1.13.2.jar:1.13.2] at io.prometheus.client.exporter.PushGateway.push(PushGateway.java:138) ~[flink-metrics-prometheus-1.13.2.jar:1.13.2] at org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:63) [flink-metrics-prometheus-1.13.2.jar:1.13.2] at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:494) [LoadMySqlFlinkCDC.jar:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_221] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_221] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_221] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_221] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_221] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_221] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
出现上面的错误,可配置参数
// 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间
props.setProperty("connect.timeout", String.valueOf(10 * 60 * 1000));
checkpoint的间隔也要调整为合适的时间,比如和上面的等待时间一样