Spark跟Flink的聚合状态恢复,谁能保证精确一次性?

来源:安瑞哥是码农


上两篇文章分别测试了 Flink 跟 Spark 以流的方式读取 HDFS 数据和 kafka 数据,做简单的数据处理之后,写入 Elasticsearch。


然后分别用手动 cancel 的方式,以及设计程序遇到数据问题的方式,分别都退出进程。


最后,通过彼此的 checkpoint 来恢复数据,从最终的恢复效果来看,在没有采取其他任何辅助措施的情况下,Flink 跟 Spark 都不能实现数据的「精确一次性」。


但好在,对于一个生产系统来说,我们在乎的,是数据入库之后的「最终一致性」,而对于这个要求,通过设置每条数据的唯一 id,就能搞定。


但是,如果这个时候,咱们把之前的「简单 ETL 入库」,换成「聚合结果入库」,情况还能一样吗?


理论上分析,如果这个时候跟之前一样,中间过程数据出现了重复,那么聚合结果就要比预期的要大,即便这个时候设置了唯一 id (利用去重表),也弥补不了这个错误。


那到底实际情况是怎样的呢?咱们再来实测一次



0. 场景准备


数据源还是用上次写入到 Kafka 的那一份一千多万的数据,而为了更好的演示最终的聚合结果,这次把写入结果的数据库从 ES 换成 Doris 的去重模型表。


具体流程是这样式的:



同时设计的 Doris 表结构如下:


CREATE TABLE `agg_result_table` (
  `client_ip` varchar(50),
  `sum` int(11)
)
UNIQUE KEY(`client_ip`)
DISTRIBUTED BY HASH(`client_ip`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);


先上 Flink SQL API 的代码:


package com.anryg.doris

import java.time.Duration

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
  * @DESC: 通过Flink SQL API读取kafka数据,对数据进行聚合,最后把结果写入到Doris去重表,测试聚合状态的精确一次性
  * @Auther: Anryg
  * @Date: 2024/4/18 11:29
  */

object FlinkSQLAggStatusTest {

    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        env.enableCheckpointing(100000L)

        env.setStateBackend(new EmbeddedRocksDBStateBackend(true)) //新的设置state backend的方式
        env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkSQLAggStatusTest")
        env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION//设置checkpoint记录的保留策略
        env.getCheckpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(1L))
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

        val tableEnv = StreamTableEnvironment.create(env)

        /**第一步:读取kafka数据源*/
        tableEnv.executeSql(
            """
             Create table data_from_kafka(
             |client_ip STRING,
             |domain STRING,
             |`time` STRING,
             |target_ip STRING,
             |rcode STRING,
             |query_type STRING,
             |authority_record STRING,
             |add_msg STRING,
             |dns_ip STRING
             |)
             |with(
             |'connector' = 'kafka',
             |'topic' = 'test',
             |'properties.bootstrap.servers' = '192.168.211.107:6667',
             |'properties.group.id' = 'FlinkSQLAggStatusTest',
             |'scan.startup.mode' = 'timestamp',                     //指定用时间戳的读取方式
             |'scan.startup.timestamp-millis' = '1713182435000',    //指定消费的时间戳位置
             |'value.format'='csv',                                 //确定数据源为文本格式
             |'value.csv.field-delimiter'=','                      //确定文本数据源的分隔符
             |)
            "
"".stripMargin)

        /**第二步:创建Doris映射表*/
        tableEnv.executeSql(
            s"""
            |CREATE TABLE agg_from_flink (
            |`client_ip` STRING,
            |`sum` BIGINT
            |)
            |    WITH (
            |      'connector' = 'doris',
            |      'fenodes' = '192.168.221.173:8030',
            |      'table.identifier' = 'example_db.agg_from_flink02',
            |      'username' = 'root',
            |      'password' = 'xxxx',
            |     'sink.label-prefix' = '${args(0)}'  //导入标签前缀,每次要不一样
            |)
            "
"".stripMargin)

        /**第三步:数据写入到Doris表中*/
        tableEnv.executeSql(
            """
            |INSERT INTO agg_from_flink
            |SELECT
            |client_ip,
            |count(*)
            |from
            |data_from_kafka
            |GROUP BY
            |client_ip
            "
"".stripMargin)
    }
}


再上 Spark structed streaming API 的代码:


package com.anryg.bigdata.streaming.doris

import com.alibaba.fastjson.JSONObject
import org.apache.log4j.{LevelLogger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode

/**
  * @DESC: 测试 Spark 在流式模式下的聚合计算,并将结果写入到Doris,测试结果是否能做到精确一次性
  * @Auther: Anryg
  * @Date: 2024/4/18 11:23
  */

object SparkSQLAggStatusTest {

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("SparkSQLAggStatusTest").setMaster("local[*]")
        val spark = SparkSession.builder()
            .config("spark.sql.streaming.stateStore.providerClass",
                "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")//指定流计算的状态存储为RocksDB,3.2之后支持
            .config(conf)
            .getOrCreate()

        val rawDF = spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers""192.168.211.107:6667")
            .option("subscribe""test")
            .option("failOnDataLoss",false)
            .option("fetchOffset.numRetries",3)
            .option("startingTimestamp","1713182435000"//通过时间戳指定消费位置
            .load()

        import spark.implicits._
        val df1 = rawDF.selectExpr("CAST(value AS string)")
            .map(row =>{
                val line = row.getAs[String]("value")
                val fieldArray:Array[String] = line.split(",")
                fieldArray
            }).map(array => {
            var client_ip = ""var domain = ""var time = ""var target_ip = ""var rcode = ""var query_type = ""var authority_record = ""var add_msg = ""var dns_ip = ""
            try {
                client_ip = array(0)
                domain = array(1)
                time = array(2)
                target_ip = array(3)
                rcode = array(4)
                query_type = array(5)
                authority_record = array(6)
                add_msg = array(7)
                dns_ip = array(8)
            } catch {
                case e:Exception => println(array,e)
            }
            (client_ip, domain, time, target_ip, rcode, query_type, authority_record, add_msg, dns_ip)
        }).toDF("client_ip","domain","time","target_ip","rcode","query_type","authority_record","add_msg","dns_ip")

        df1.createOrReplaceTempView("data_from_kafka")

        /**聚合计算逻辑,取出聚合结果*/
        val df2 = spark.sql(
                    """
                      |SELECT
                      |client_ip,
                      |count(*) AS sum
                      |FROM data_from_kafka
                      |GROUP BY
                      |client_ip
                    "
"".stripMargin)

        /**必须要对结果的格式进行重新定义为json string,否则直接无法写入Doris*/
        val df3 = df2.map(row => {
                    val json = new JSONObject(2)  /**将数据封装为json对象*/
                    val client_ip = row.getString(0)
                    val count = row.getLong(1)

                    json.put("client_ip",client_ip)
                    json.put("sum", count)
                    json.toJSONString
                }).toDF()

       df3.writeStream
            .outputMode(OutputMode.Complete()) //确定输出模式,因为是聚合结果,要用complete模式
            .format("doris")
            .option("doris.table.identifier""example_db.agg_from_spark01")
            .option("doris.fenodes","192.168.221.173:8030")
            .option("user","root")
            .option("password","xxxx")
            .option("checkpointLocation","hdfs://192.168.211.106:8020/tmp/spark_checkpoint/SparkSQLAggStatusTest"/**这个必须加上,记录offset以及计算状态*/
            .start()
            .awaitTermination()
    }
}

简单起见,这次只做了一个非常 easy 的聚合。



1. 基准结果测试


开始,分别执行 Flink 跟 Spark 的代码,将准备好的数据源在没有程序中断的情况下,全部入库,目的是为了测试整个代码逻辑,以及流程的合理性。


在跑之前,我们先来看一眼原始数据的聚合结果



取聚合数量排在前20的部分结果,用作后续的对比(事实证明,只用这部分就够了)。


接着,用 Flink 代码也完整跑一遍,得到如下的最终结果:


可以看到,跟上面的标准结果保持一致。


然后,再用 Spark 代码完整跑一遍,得到如下的最终结果:


同样,也跟上面的标准结果保持一致。


说明,当前无论是 Flink 还是 Spark 的代码逻辑,以及设计的这个写入流程,是没有问题的。



2. Flink 聚合状态恢复测试


这一次,咱们再把程序先给跑起来,把数据写入到另外一个 Doris 去重表里,然后,挑一个良辰吉时,把任务 cancel 掉。



此时,写入到 Doris 表的数据情况为(此时的数据还没有完全跑完):



接着,再利用 Flink 的精确一次性语义,通过 checkpoint 进行状态恢复,看最终写入到表里的聚合结果,是否符合预期。



等剩下的所有数据跑完,全部写入到 Doris 去重表后的数据结果为:


经过对比,有点出乎我意料的是,最终这个聚合结果是符合预期的(如果出现数据重复,会导致聚合结果比预期要大)。


说明,基于当前聚合计算的场景下,Flink 利用其「精确一次」语义进行状态恢复时,是真的可以达到结果的「最终一次性」的


但是,这里需要注意的一点,那就是 Flink 的checkpoint 时间间隔问题,不宜设置过长,这样会导致任务重启的时候,进程还没有来得及 checkpoint,会重复读取之前所有处理过的数据(你猜,这样会给最终的聚合结果带来什么影响?)。



3. Spark 聚合状态恢复测试 


也是先把进程跑起来,然后,瞅准一个时机,把进程 kill 掉,这个时候,写入到 Doris 表的数据情况如下:


接着,利用 Spark 的 checkpoint 进行状态恢复(直接重启就行)。


最终,写入到 Doris 的结果是这样的:

 


经过比对,跟预期的正确结果保持一致。


说明,基于当前聚合计算的场景下,Spark 直接重启进程进行状态恢复时,也是可以达到结果的「最终一次性」的



最后


从这次的测试结论来看,由于有了前两次的刻板映像,所以一开始,我会不自觉的认为,无论是 Spark 还是 Flink,通过状态恢复后,依然会出现数据重复,这样也就会导致最终的聚合结果「比实际值大」。


但经过实测之后发现,并没有,并且这两者「聚合状态恢复」后的结果,都是符合预期的正确结果


只不过在测试过程中,有几个比较有意思的现象,那就是一开始,我都用本地 idea 环境跑的时候(相同资源,相同并行度情况下),Flink 的入库效率要明显低于 Spark。


但是,换成集群环境后,这俩货的执行效率,就发生了倒挂。


不知道你对本次测试的过程有没有兴趣,如果有,可以关注我周末的直播,咱们一起再来玩一遍如何?


最后,你可以思考一个问题,如果我在状态恢复时,把 Flink 的「精确一次性」换成「至少一次性」,你觉得最终的聚合结果,还会是对的吗?

请使用浏览器的分享功能分享到微信等