Flink跟Spark流状态聚合这个坑,你趟过没?

来源:安瑞哥是码农


上篇文章写了 Flink 跟 Spark 针对实时流聚合计算的初步对比,从功能上来看,这两款计算引擎,都能够支持对流状态下的数据进行聚合计算。


但是,从上次的测试来看,功能上满足了对数据的聚合要求,可是当我把每一次流的聚合结果写入到 Doris 的去重表时,尴尬发现,Flink 跟 Spark 最终的聚合结果,居然跟正确的聚合结果对不上


Flink跟Spark流计算状态对比,你感兴趣不?


由于上次测试的数据有四百多万条,虽然量不大,但是如果想彻查出问题,并把问题通过文章给大家演示出来,难度依然比较大。


于是,就想着,既然只是体现最终聚合的结果不准,那咱把测试的数据量再给减少点演示,效果是一样的。


(本次测试基于 Flink1.15,Spark3.2 开展)



0. 验证设计


还是取 Clickhouse(下称CK) 的一张表为数据源,简单起见,这里只写入140条数据,经过聚合之后的结果如下(睁大眼睛注意这个 SQL 的写法,一会要考):


这个聚合结果,就是我们今天要测试对比的标准答案。


后续,我会将这个表的所有数据,以一定的频率推到 Kafka,然后分别用 Flink 跟 Spark 流的方式对其进行聚合统计,并把聚合的过程,以及最终结果都给拿出来对比。


希望通过这种全过程的对比的方式,揪出聚合结果不对的问题所在。



1. Flink 聚合结果展示


该部分经过调试之后的代码如下:


package com.anryg

import java.time.Duration

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
  * @DESC: 测试 Flink SQL 在流式计算下的聚合计算,并将结果写入到Doris
  * @Auther: Anryg
  * @Date: 2024/4/7 17:20
  */

object FlinkGroupByTest {

    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        env.enableCheckpointing(60000L)

        env.setStateBackend(new EmbeddedRocksDBStateBackend(true)) //新的设置state backend的方式

        env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkGroupByTest")
        env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION//设置checkpoint记录的保留策略
        env.getCheckpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(1L))
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)

        val tableEnv = StreamTableEnvironment.create(env)

        /**第一步:读取kafka数据源*/
        tableEnv.executeSql(
            """
             Create table data_from_kafka(
             |client_ip STRING,
             |domain STRING,
             |`time` STRING,
             |target_ip STRING,
             |rcode STRING,
             |query_type STRING,
             |authority_record STRING,
             |add_msg STRING,
             |dns_ip STRING
             |)
             |with(
             |'connector' = 'kafka',
             |'topic' = 'test',
             |'properties.bootstrap.servers' = '192.168.211.107:6667',
             |'properties.group.id' = 'FlinkGroupByTest',
             |'scan.startup.mode' = 'latest-offset',
             |'value.format'='csv',                                 //确定数据源为文本格式
             |'value.csv.field-delimiter'=','                      //确定文本数据源的分隔符
             |)
            "
"".stripMargin)

        /**第二步:创建Doris映射表*/
        tableEnv.executeSql(
            s"""
               |CREATE TABLE data_from_flink01 (
               |client_ip STRING,
               |domain STRING,
               |`count` BIGINT
               |)
               |    WITH (
               |      'connector' = 'doris',
               |      'fenodes' = '192.168.221.173:8030',
               |      'table.identifier' = 'example_db.data_from_flink01',
               |      'username' = 'root',
               |      'password' = 'xxx',
               |     'sink.label-prefix' = '${args(0)}'  //导入标签前缀,每次要不一样
               |)
            "
"".stripMargin)

        /**第三步:写聚合结果到Doris表*/
        tableEnv.executeSql(
            """
              |INSERT INTO data_from_flink01
              |SELECT
              |    client_ip,
              |    lower(domain) AS domain,
              |    count(*) AS `count`
              |FROM data_from_kafka
              |GROUP BY
              |    client_ip,
              |    domain
            "
"".stripMargin)
    }
}


运行之后,全部数据写入到 Doris 表的结果如下:



怎么样?是不是跟上面的标准聚合结果不一样


为了进一步找到问题,再把计算的中间过程,给打印出来(为了方便展示,有部分中间结果被我省略掉了):



怎么样?看到这个懵逼的状态结果,是不是让你一脸懵逼?有些明明可以聚合到一起的记录,却被硬生生地分开了。


没事,再来看一眼 Spark 的,同样的懵逼结果,还会再次上演。



2. Spark 聚合结果展示


同样,对于 Spark 调试之后的代码如下:


package com.anryg.bigdata.streaming.stateful


import com.alibaba.fastjson.JSONObject
import org.apache.log4j.{LevelLogger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{OutputModeTrigger}

/**
  * @DESC: 测试 Spark 在流式计算下的聚合计算,并将结果写入到Doris
  * @Auther: Anryg
  * @Date: 2024/4/7 11:23
  */

object SparkGroupByTest {

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("SparkGroupByTest").setMaster("local[*]")
        val spark = SparkSession.builder()
            .config("spark.sql.streaming.stateStore.providerClass",
                "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")//指定流计算的状态存储为RocksDB,3.2之后支持
            .config(conf)
            .getOrCreate()

        val rawDF = spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers""192.168.211.107:6667")
            .option("subscribe""test")
            .option("failOnDataLoss",false)
            .option("fetchOffset.numRetries",3)
            .option("startingOffsets","latest")
            .load()

        import spark.implicits._
        val df1 = rawDF.selectExpr("CAST(value AS string)")
            .map(row =>{
                val line = row.getAs[String]("value")
                val fieldArray:Array[String] = line.split(",")
                fieldArray
            }).map(array => {
            var client_ip = ""var domain = ""var time = ""var target_ip = ""var rcode = ""var query_type = ""var authority_record = ""var add_msg = ""var dns_ip = ""
            try {
                client_ip = array(0)
                domain = array(1)
                time = array(2)
                target_ip = array(3)
                rcode = array(4)
                query_type = array(5)
                authority_record = array(6)
                add_msg = array(7)
                dns_ip = array(8)
            } catch {
                case e:Exception => println(array,e)
            }

            (array(0),array(1),array(2),array(3),array(4),array(5),array(6),array(7),array(8))
        })
            .toDF("client_ip","domain","time","target_ip","rcode","query_type","authority_record","add_msg","dns_ip")

        df1.createOrReplaceTempView("data_from_kafka")

        /**聚合计算逻辑,取出聚合结果*/
        val df2 = spark.sql(
                    """
                      |SELECT
                      |    client_ip,
                      |    lower(domain) AS domain,
                      |    count(*) AS count
                      |FROM data_from_kafka
                      |GROUP BY
                      |    client_ip,
                      |    domain
                    "
"".stripMargin)

        /**必须要对结果的格式进行重新定义为json string,否则直接无法写入Doris*/
        val df3 = df2.map(row => {
                    val json = new JSONObject(3)  /**将数据封装为json对象*/
                    val client_ip = row.getString(0)
                    val domain = row.getString(1)
                    val count = row.getLong(2)

                    json.put("client_ip",client_ip)
                    json.put("domain", domain)
                    json.put("count", count)
                    json.toJSONString
                }).toDF()

       df3.writeStream
            .outputMode(OutputMode.Complete()) //确定输出模式,因为是聚合结果,要用complete模式
            .format("doris")
            .option("doris.table.identifier""example_db.data_from_spark01")
            .option("doris.fenodes","192.168.221.173:8030")
            .option("user","root")
            .option("password","xxx")
            .option("checkpointLocation","hdfs://192.168.211.106:8020/tmp/spark_checkpoint/test/SparkGroupByTest"/**这个必须加上,记录offset以及计算状态*/
            .start()
            .awaitTermination()
    }
}

同样,消费完所有推给 Kafka 的数据之后,写入到 Doris 表的数据如下:


怎么样?对于这个结果,你惊不惊喜,意不意外?虽然人家结果不对,但是有个性啊,跟 Flink 错得不重样呢。 


同样的,我们再来瞅一眼它全部的中间输出结果:


同样的,这个懵逼的中间过程,有没有让你再次感到懵逼?



3. 揪出原因


那导致上面聚合结果不准确的原因在哪呢,是整个流程设计不对?还是代码逻辑有问题?还是这个实时的聚合方式压根就是唬人的?


经过一番换数据,换 SQL 写法之后发现,尼玛,原因居然在 SQL 方言上


先看 CK SQL:


还记得刚开始对 CK 数据源表的聚合 SQL 吗?


它是这么写的:


SELECT
    client_ip,
    lower(domainAS domain,
    count(*) AS count
FROM data01
GROUP BY
    client_ip,
    domain

第1个SQL写法


但其实,它也可以这么来写:


SELECT
    client_ip,
    lower(domainAS lower_domain,
    count(*) AS sum
FROM data01
GROUP BY
    client_ip,
    lower_domain

第2个SQL写法


得到的聚合结果是一样的。


这个说明什么?说明对于 CK 来说,在 Select 中给字段取的别名,它在 group by 条件中是能准确识别的


再看 Flink SQL:


当我把原本 Flink 中的 SQL 换成上面的第2个写法之后,来看下运行效果:


发现没,它不能像 CK 那样聪明,在它的 group by 条件里,识别不了 select 中指定的别名。


再看 Spark:


同样,把原本 Spark 中的 SQL 换成上面的第2个写法之后,你猜怎么着?


不同于 Flink SQL 的是,Spark SQL 在 where 条件里,能识别这个别名


而且,能够跑出正确的聚合结果。


只不过,对于别名的命名,必须要区别于原字段名(这点跟 CK 又不一样)。


所以,这个聚合结果不对的根本原因在于:Flink 跟 Spark SQL 的 group by 条件中,对 domain 这个字段的聚合,还停留在没有经过 lower() 函数处理之后的原始数据上


这就解释了,在上面的中间结果显示中,明明是同一条记录,为什么没有聚合到一起,因为他们的原始数据里,domain 这个字段值,是有大小写区别的。



最后


从这次 Flink 和 Spark 流聚合结果不正确的原因分析来看,既不是流程设计问题,也不是计算逻辑的问题,核心关键在于:SQL 方言不一样


对于这次测试,同一个聚合 SQL,我们用在了3个地方,CK SQL、Flink SQL,以及 Spark SQL,但是表现却各不一样。


其中,CK 最为智能,它能非常准确地理解我的数据处理要求,不仅能在 group by 条件中准确识别我的字段别名,即便我将新字段的别名,命名成跟老字段一样lower(domain) as domain」,它也能准确领会我的意思。


但对于 Spark 来说,虽然它也能在 group by 条件中识别别名,但是,这个别名,你不能把它命名成跟原字段一样的名字,否则,Spark 会把它理解成原字段,而不是重命名后的新字段,这样一来,也会带来聚合结果的不准确。


而对于 Flink SQL,在这一点上的表现则最矬,人家直接在 group by 件里,压根就识别不了你在 select 中指定的别名,对于本次测试的聚合 SQL,想要聚合结果正确,你只能这么来写:



那么对于这个坑,你有趟过吗?


请使用浏览器的分享功能分享到微信等