Flink的精确一次性消费,真的能保证吗?

来源:安瑞哥是码农


还记得第一次用 Flink 的时候,就被它设置 CheckpointingMode 时,可以选择用 AT_LEAST_ONCE 或者 EXACTLY_ONCE 恢复数据状态的模式给「惊住了。


当然,这个惊,不是感叹 Flink 有多么厉害,而是感慨,保证「精确一次性这么大言不惭的牛逼,是怎么敢吹出来的?


做过大数据开发的都应该知道,计算引擎在一个项目中,它从来都是处于一个「承上启下」的地位,上承数据源,下接数据库,连在一起,就是3个异构的系统在一起配合着干活。


对于任何一次数据计算,想要到达所谓的「精确一次性」,那就要保证这里面3个环节的「幂等」或者说「事务」,而我们最常见的,可以保证事务的系统,是单节点的数据库,比如 MySQL、Oracle这些。


它们之所以能保证这个「精确一次性」,原因在于人家有非常复杂、且完备的日志管理体系,最关键的是,人家只敢保证「在一个系统内部」。


而你 Flink,却要确保3个异构系统的精确一次性,凭啥?


虽然对于 Flink 的这个 EXACTLY_ONCE ,源码解释了它的含义,



之前我在一期「如何确保精确一次性消费」的视频里,也用工程化的思路,拿具体案例讲过它的解决方案,说想要保证这一点,一定不是简单配置某个参数,或者开启某个开关就能做到的,它是一个「系统化」工程。


但总架不住一些 Flink 的崇拜粉坚定地认为:这能有多难的?用 Flink 设置 EXACTLY_ONCE 模式不就解决了吗?还费这些劲?


说这话的人啊,只能说明一点:没有经历过真实环境的毒打,自己掌握的那点知识呢,也就仅仅停留在网络上口口相传的烂大街八股文里,但凡有点脑子跟怀疑精神,就不会这么武断。


那作为一个喜欢给那些「高大上」技术祛魅,热衷扯掉它们身上遮羞布的人,今天咱就来实地演示一下,这个 Flink 的 EXACTLY_ONCE 模式,到底有没有一些人认为的那么神?



0. 祛魅准备


因为这个 CheckpointingMode 的 EXACTLY_ONCE 模式是专门针对「流」场景的,所以,咱们也要找一个流模式下的测试方式。


从以往我测试过的 Flink 诸多场景来看,它既可以用流的方式消费消息队列」数据,也可以消费文件系统」数据。


那么今天,我们就设计用 Flink 流的方式读取 HDFS 的某个文件,



然后将数据经过简单的处理后,写入到 Elasticsearch (下称ES)。


测试过程大致如下:


第1步:Flink 以流的方式读取 HDFS 的全部数据,然后写入到 _id 自动生成的 ES 索引中,看整个数据写完,总数据量有多少条,跟原始数据量是否一致;


第2步:也是用 Flink 以流的方式读取 HDFS 的全部数据,把 CheckpointingMode 设置成 EXACTLY_ONCE 模式,写入到另一个同样 _id 自动生成的 ES 索引中,但是,在数据写入过程中,故意停掉进程,然后通过上一次的 checkpoint 再次恢复,看最终写入的数据量是否符合预期。


第3步:其他参数不变,将 EXACTLY_ONCE 换成 AT_LEAST_ONCE 再测试,看结果有哪些变化;


第4步:其他参数不变,只修改 checkpoint 的时间间隔,再看写入到 ES 的数据量,有多大变化。


测试样例代码如下:


package com.anryg.hdfs

import java.time.Duration
import java.util

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.connector.sink2.SinkWriter
import org.apache.flink.connector.elasticsearch.sink.{Elasticsearch7SinkBuilderElasticsearchSinkRequestIndexer}
import org.apache.flink.connector.file.src.FileSource
import org.apache.flink.connector.file.src.reader.TextLineInputFormat
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests


/**
  * @DESC: 对 Flink 的精确一次性消费进行测试
  * @Auther: Anryg
  * @Date: 2024/4/11 10:46
  */

object FlinkExactlyOnceTest {

    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        env.enableCheckpointing(args(0).toLong) //设置Checkpoint时间间隔

        env.setStateBackend(new EmbeddedRocksDBStateBackend(true)) //新的设置state backend的方式
        env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkExactlyOnceTest")
        env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION//设置checkpoint记录的保留策略
        env.getCheckpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(1L))
        //控制用哪种Checkpoint模式
        if (args(1).equals("1")) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
        else env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)

        val fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat,new Path("hdfs://192.168.211.106:8020/tmp/from_doris/b5bb9f7420d84255-900a15f5988d44a3_0.csv"))
                                    .monitorContinuously(Duration.ofSeconds(3))
                                    .build()

        import org.apache.flink.streaming.api.scala._  //引入隐私转换函数

        val fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(),"file-from-hdfs-2ES")

        val builder = new Elasticsearch7SinkBuilder()
                .setHosts(new HttpHost("192.168.221.173",9201,"http"))
                .setEmitter((element:String, context:SinkWriter.Context, indexer: RequestIndexer) =>{

                    val array = element.split(",")

                    var client_ip = ""var domain = ""var time = ""var target_ip = ""var rcode = "99"var query_type = "99"var authority_record = ""var add_msg = ""var dns_ip = ""
                    try {
                        if (array(0) != null) client_ip = array(0)
                        if (array(1) != null) domain = array(1)
                        if (array(2) != null) time = array(2)
                        if (array(3) != null) target_ip = array(3)
                        if (array(4) != null) rcode = array(4)
                        if (array(5) != null) query_type = array(5)
                        if (array(6) != null) authority_record = array(6)
                        if (array(7) != null) add_msg = array(7)
                        if (array(8) != null) dns_ip = array(8)
                    } catch {
                        case e: Exception => println(element, e)
                    }

                    val map = new util.HashMap[String,String]() /**必须是Java的map类型*/
                    map.put("client_ip", client_ip)
                    map.put("domain", domain)
                    map.put("time", time)
                    map.put("target_ip", target_ip)
                    map.put("rcode", rcode)
                    map.put("query_type", query_type)
                    map.put("authority_record", authority_record)
                    map.put("add_msg", add_msg)
                    map.put("dns_ip", dns_ip)

                    val indexRequest = Requests.indexRequest()
                        .index(args(2)) /**目标索引*/
                        .source(map)

                    indexer.add(indexRequest)
                }
                ).build()

        fileDS.sinkTo(builder.asInstanceOf[ElasticsearchSink[String]])/**这里需要进行强制转换*/

        env.execute() //启动任务,action按钮
    }
}



1. 正常写入时


Flink 用流方式读取上面 HDFS 的一千多万条数据,写入到 ES 后,ES 索引的数据量情况:



用 EXACTLY_ONCE 模式消费后(当前任何模式都可以),写入到 ES 的数据量,跟原始数据量是一样的,证明当前整个 Flink 的数据读取,处理,以及写入逻辑没有问题。


在写入过程中,也可以发现,Flink 确定是有对中间状态进行存储的(注意,如果是 batch 模式,这个状态存储的文件夹为空,也就是不保持状态)。




2. 程序中断后再恢复写入时 


以 EXACTLY_ONCE 模式启动 Flink 进程,然后把数据写入到另一个 ES 索引里。


然后,当数据写入到两百多万的时候,我手动把进行给停掉。



看下这个时候,写入 ES 的数据量:


跟 Flink 进程里看到的数量保持一致,这里没有问题。


再瞅一眼这个时候写入到 HDFS 的状态文件:



不为空,说明我们可以用这个,来恢复刚才中断的数据写入过程。


你猜,恢复之后,数据写入到 ES 的数据总量会是多少呢?


通过从 checkpoint 恢复之后,再次写入 ES,整个数据全部写完,



可以看到,它又写了 9015208 条,加上在进程恢复之前的 2126000 条,就得到了如下写入到 ES 的总量:



很明显,根本达不到我们需要的「精确一次性」要求


写入到 ES 的数据总量,比实际数据量,多出了整整 69000 条。


通过这个结果我们可以知道,这个「状态恢复」肯定是起到作用了的,要不然,写入的数据总量就会多出 2126000 条,只不过,达不到它宣称的 EXACTLY_ONCE 语义。


整个表现给人的感觉就是:这不就是妥妥的「至少一次性」语义吗?



3. 对「至少一次性」语义的测试


既然在我们的实践过程中,EXACTLY_ONCE 」的表现跟AT_LEAST_ONCE 」是一样的。


那么如果我们把语义换成真正的AT_LEAST_ONCE 」,表现又会是怎么样的呢?


同样,启动程序,这次,其他参数全部不变,我们把 checkpoint 模式,给改成用 AT_LEAST_ONCE ,换一个新的 ES 索引再跑。


同样,在写到跟上次差不多数据量的时候,让我给 cancel 掉了(实在是没有办法保证跟上次的量一样,眼睛都给我瞪疼了)



这个时候,写入到 ES 的数据量为:


接着,我们用 checkpoint 进行恢复,重启任务,这次 Flink 处理的数据总量为:



而写入到 ES 的数据总量为:


可以看到,这个时候的数据总量,比预期的多了 287000 条。


相比上面测试的 EXACTLY_ONCE,又多了 278000 条。


可以明显看出来,不管是 Flink 的 EXACTLY_ONCE 还是的的 AT_LEAST_ONCE 语义,在本次场景中,都会导致数据的写入重复,只不过,前者重复的量少一点,后者更多一些而已



4. 调整 checkpoint 时间后


这次,继续用 EXACTLY_ONCE 的模式,只不过把 checkpoint 的时间间隔,从之前的60秒,给改成了3秒,其他条件不变。


同样,中间手动断掉一次,通过检查点再次恢复程序后的 ES 数据总量如下:


可以明显看出来,通过缩短 checkpoint 时间间隔,这个重复消费的数据量在变小,由之前的 69000 条,缩小为 3000 条


但依然,做不到所谓的「精确一次性」。



最后


从这次设计的 Flink 流式计算场景得出的结论来看,Flink 所谓的「精确一次性」最多只是停留在理论层面,而在实际工作中,几乎不可能达到


正如一开始我分析的那样,一个异构的,由3个不同系统组成的工作流,想通过设置一个简单的参数,就能实现类似幂等,或者事务的功能,多少有点「痴人说梦」。


而且要知道,这次的测试,我用的还只是手动 cancel 这种非常「优雅且理想」的方式让 Flink 进程退出,而如果是碰到比如,数据库端在写入数据时的异常,或者因为数据问题导致的退出,那情况会不会变得更糟糕?


如果你有兴趣,下次,咱们可以再测试一下,Flink 通过读取 kafka 数据,以及用流聚合的计算方式,再次验证这个所谓的「精确一次性」是不是还依然的这样,不靠谱。


对了,关于这次对比的详细过程,我准备这个周末通过直播演示出来,有兴趣的同学可以多多点赞,让我看到你们的热情哟。

请使用浏览器的分享功能分享到微信等