Spark3.2 VS Flink1.15关于window和watermark功能

(PS:本次验证用到的组件分别为:kafka2.0.0、Spark3.2.0、Flink1.52.3 
涉及到的代码已经全部提交到实战项目中:
https://github.com/Anryg/internet_behavior_project) 

上篇文章写了关于 Flink 和 Spark 的一个非常简单的流式处理案例,虽然需求很简单,但也算是完成了一个完整流式数据处理的流程。

整个流程分为:数据源读取、数据ETL处理、数据结果的sink。

经过这么一个完整的处理流程,你就能感受到一个大数据工具如何将其应用到生产,去解决一个实际问题时会存在哪些实实在在的坑。

继上次过一些坑之后,这次我领着大家继续,这次咱们分别来看看Spark和Flink流式处理中的 window 和 watermark 功能,同样还是用之前给大家提供的数据集,具体需求场景如下:

从kafka读取数据源,实时统计时间窗口为2分钟的,排名前十的上网用户的数量(客户端IP数量),滑动窗口为30秒(即分析结果每30s更新一次),且数据容忍的迟到时间为10s,然后把结果输出到控制台。

看着是不是也挺简单的,其实就是一个基于时间滑动窗口worldcount,但是也别小瞧这个案例,生产上很多时候就是面对这样的一些需求,但是想做好,也没有想象中那么简单。


简单分析一下这个需求:

既然是统计client_ip基于时间的数量,那么就只需要取client_ip以及上网时间(time)这两个字段就可以了,然后基于时间窗口进行聚合就能得到结果。


下面分别从Spark的structured streaming以及Flink的API来实现:



1.Spark的实现


先看Structuredstreaming的实现,参考代码如下:

package com.anryg.bigdata.streaming.demo.window_watermark

import java.sql.Timestamp
import java.text.SimpleDateFormat
import org.apache.log4j.{LevelLogger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode

/**
  * @DESC: 用时间窗口和watermark来进行client_ip的worldcount统计
  * @Auther: Anryg
  * @Date: 2022/11/30 10:04
  */

object WorldCountWithWatermark {

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("WorldCountWithWatermark").setMaster("local")
        val spark = SparkSession.builder()
                .config(conf)
                .getOrCreate()
        Logger.getLogger("org.apache").setLevel(Level.WARN//减少INFO日志的输出

        val rawDF = spark.readStream
                .format("kafka")
                .option("kafka.bootstrap.servers""${kafka_broker}:6667")
                .option("subscribe""${topic}")
                //.option("group.id","test9999") /**不再用该方式来绑定offset,而是每个程序有个唯一的id,该id跟checkpointLocation绑定,虽然group.id属性在运行中依然保留,但是不再跟offset绑定*/
                .option("failOnDataLoss",false)
                .option("fetchOffset.numRetries",3)
                //.option("maxOffsetsPerTrigger",Integer.MAX_VALUE)/**用于限流,限定每个批次取的数据条数,确定写入HDFS单个文件的条数*/
                .option("startingOffsets","latest")
                .load()

        import spark.implicits._
        val df1 = rawDF.selectExpr("CAST(value AS string)")
                .map(row =>{
                    val line = row.getAs[String]("value")
                    val fieldArray:Array[String] = line.split("\\|")
                    fieldArray
                })
                .filter(_.length == 9//确定字段数必须为9个
                .filter(_(1).endsWith("com")) //防止数量太大,对访问的网站做的一点限制
               .map(array =>{
                    val sdf = new SimpleDateFormat("yyyyMMddhhmmss").parse(array(2))
                    val time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(sdf)
                    (array(0), Timestamp.valueOf(time)) //因为time这个字段要作为watermark字段,它必须是yyyy-MM-dd HH:mm:ss样式的Timestamp类型
                })
                .toDF("client_ip""time"//添加schema

        import org.apache.spark.sql.functions._  /**引入spark内置函数*/

        val df2 = df1.withWatermark("time""10 seconds"//一般需要跟window一起配合使用
                .groupBy(window($"time","2 minutes","30 seconds"), $"client_ip"//确定具体字段,以及对应的聚合时间窗口,和滑动窗口
                .count()
                .orderBy($"count".desc)
                .limit(10)

        val query = df2.writeStream
                .format("console"//打印到控制台
                .option("truncate"false//将结果的内容完整输出,默认会砍掉内容过长的部分
                .option("numRows",30//一次最多打印多少行,默认20行
                .option("checkpointLocation","hdfs://${namenode}/tmp/offset/WorldCountWithWatermark"//确定checkpoint目录
                //.outputMode(OutputMode.Update())//不支持排序的结果
                .outputMode(OutputMode.Complete()) //确定输出模式,默认为Append
                .start()

        query.awaitTermination()
    }

}

来看下运行效果,如下:

其实代码还是挺简单的(代码中有比较详细的注释),整个内容完全参考官网,也挺好理解的,个人认为这方面的文档比FLink的官方文档要更加清晰,易懂,不信,你可以去做个对比。


2.Flink DataStream的实现

因为对于Flink来说,实时的API有两套,个DataStream API另一个是Table API。

因为就在前两天,我在用Table API的时候遇到一个大坑,当时因为时间有限,没有趟过去(不过现在已经趟过了),于是这次就想着还是先用DataStreamAPI吧。

来,对应的代码实现如下:
package com.anryg.window_and_watermark

import java.text.SimpleDateFormat
import java.time.Duration
import java.util.Locale

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssignerWatermarkStrategy}
import org.apache.flink.api.common.serialization.{SimpleStringEncoderSimpleStringSchema}
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindowsSlidingProcessingTimeWindowsTumblingEventTimeWindowsTumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time
import org.apache.flink.streaming.api.windowing.time.Time


/**
  * @DESC: 读取kafka数据,从DataStream到HDFS
  * @Auther: Anryg
  * @Date: 2022/8/14 19:08
  */

object FlinkDSFromKafkaWithWatermark {

    private final val hdfsPrefix = "hdfs://${namenode}:8020"

    def main(args: Array[String]): Unit = {
        //获取流任务的环境变量
        val env = StreamExecutionEnvironment.getExecutionEnvironment
                .enableCheckpointing(10000CheckpointingMode.EXACTLY_ONCE//打开checkpoint功能

        env.getCheckpointConfig.setCheckpointStorage(hdfsPrefix + "/tmp/flink_checkpoint/FlinkDSFromKafkaWithWatermark"//设置checkpoint的hdfs目录
        env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION//设置checkpoint记录的保留策略

        val kafkaSource = KafkaSource.builder()  //获取kafka数据源
                .setBootstrapServers("${kafka_broker}:6667")
                .setTopics("${topic}")
                .setGroupId("FlinkDSFromKafkaWithWatermark")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build()

        import org.apache.flink.streaming.api.scala._  //引入隐私转换函数

        val kafkaDS = env.fromSource(kafkaSource,
            WatermarkStrategy.noWatermarks()
            ,"kafka-data"//读取数据源生成DataStream对象

        val targetDS = kafkaDS.map(line => { //对数据源做简单的ETL处理
            line.split("\\|")
        }).filter(_.length == 9).filter(_(1).endsWith("com"))
                .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofHours(10)) //指定watermark
                        .withTimestampAssigner(new SerializableTimestampAssigner[Array[String]] {
                            override def extractTimestamp(element: Array[String], recordTimestamp: Long): Long = {
                                val sdf = new SimpleDateFormat("yyyyMMddhhmmss")
                                sdf.parse(element(2)).getTime  //指定的watermark字段必须是Long类型的时间戳
                            }
                        }))
                .map(array => (array(0), 1))
                .keyBy(kv => kv._1) //根据client_ip聚合
                .window(SlidingProcessingTimeWindows.of(Time.minutes(2), Time.seconds(30)))  //指定window,这里的window assigner必须是基于Process Time而不是Event Time,因为数据时间跟当前真实时间相差有点多
                .sum(1)

        targetDS.print() //打印结果

        env.execute("FlinkDSFromKafkaWithWatermark"//启动任务
    }
}
但是,这个实现好像并不十分优雅,一来这个结果不好排序,二来显示的内容没有上面的spark实现的好看。

输出的截图如下,凑合看一下:


总感觉这个输出结果怪怪的,实际生产我们肯定不能这么用,看来这还得用Table API。


3.  Flink Table API的实现

来,下面这部分代码就是我用Table API的实现,完全根据官方文档一步步来的:
package com.anryg.window_and_watermark

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssignerWatermarkStrategy}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment



/**
  * @DESC: 读取kafka数据,从DataStream API转为Table API,并利用watermark
  * @Auther: Anryg
  * @Date: 2022/8/14 19:08
  */

object FlinkTBFromKafkaWithWatermark {
    private final val hdfsPrefix = "hdfs://192.168.211.106:8020"//HDFS地址前缀

    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment //获取流环境变量
                                            .enableCheckpointing(10000CheckpointingMode.EXACTLY_ONCE//打开checkpoint功能

        val tableEnv = StreamTableEnvironment.create(env)  //创建Table环境变量
        env.getCheckpointConfig.setCheckpointStorage(hdfsPrefix + "/tmp/flink_checkpoint/FlinkDSFromKafkaWithWatermark"//设置checkpoint的hdfs目录
        env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION//设置checkpoint记录的保留策略

        val kafkaSource = KafkaSource.builder()
                        .setBootstrapServers("192.168.211.107:6667")
                        .setTopics("qianxin")
                        .setGroupId("FlinkTBFromKafkaWithWatermark")
                        .setStartingOffsets(OffsetsInitializer.earliest())
                        .setValueOnlyDeserializer(new SimpleStringSchema())
                        .build()
        val kafkaDS = env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"kafka-data")
        val targetDS = kafkaDS.map(_.split("\\|"))
                                .filter(_.length == 9)
                                .filter(_(1).endsWith("com"))
                                .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))  //给业务字段分配watermark
                                                                                .withTimestampAssigner(new SerializableTimestampAssigner[Array[String]] {
                                                                                    override def extractTimestamp(element: Array[String], recordTimestamp: Long): Long = { //实现watermark字段的分配
                                                                                        val sdf = new SimpleDateFormat("yyyyMMddhhmmss")
                                                                                        sdf.parse(element(2)).getTime
                                                                                    }
                                }))
                                .map(array => (array(0), array(2)))
                                .map(kv => {
                                    val date = kv._2
                                    val sdf = new SimpleDateFormat("yyyyMMddhhmmss").parse(date)
                                    val time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(sdf)
                                    (kv._1, Timestamp.valueOf(time)) //将时间转为要求的Time Attributes 也就是Timestamp类型
                                })

        import org.apache.flink.table.api._  //加入隐式转换,否则下面的$无法识别

        val targetTable = tableEnv.fromDataStream(targetDS)
                                .as("client_ip""time"//添加schema
                                .window(
                                    Slide  over 1.minute every 30.seconds() on $"time" as $"w"  //加入window
                                )
                                .groupBy($"client_ip", $"w")
                                .select(
                                    $"client_ip",
                                    $"w".start(), //时间窗口的开始时间
                                    $"w".end(),  //时间窗口的解释时间
                                    $"client_ip".count() as "count"
                                )
                                .orderBy($"count")
                                .limit(10)
        targetTable.execute().print()
    }
}
看着好像是那么回事,但是尴尬的是,我完全运行不起来,下面是官方文档的内容,我全是是照着实现的:

就算我把上面Slide滑动方式改成了Tumble的方式,也是一样的,不能运行,老是报如下的错:


说什么时间类型不符合要求,可我明明已经根据要求,把时间字段类型改成了TimeAttribute类型啊,为毛还是不行呢?

折腾良久,遂作罢,此处省去6个字脏话...

但是当我其他条件不变,只是把window这个功能注释掉之后,就又可以运行了,运行结果如下:

也就是说,是这个window功能在这里作梗,但是很明显,这个结果没有任何意义,我要的是基于时间窗口的聚合结果,而现在是来一条聚合一条。

好了,就这样,不再继续折腾了,也许水平还没到位,有些地方还没有研究透彻,后面再投入精力继续,而如果你能搞定这个问题,欢迎联系我,向你学习一下。


总结一下

如果你仔细对比,你会发现无论是代码的简洁性,还是官方文档的友好程度,现阶段,我都认为Spark的要明显优于Flink。

我根据Sparkstructuredstreaming的官方文档,很快就能把这个目标需求实现了,从看文档开始到代码落地,估计也就1小时左右,代码简洁易懂。

但是我用FLink,可以说是费了老劲了,同样也是根据官方文档要求,但是实现过程中总能给你一些不大不小的“惊喜”,让你的心情酸爽无比。

也许是还没有适应Flink的这种玩法,当然也有可能是水平还不够,后续我会再继续投入精力,一定会想办法把这些坑给填平,也希望你持续关注我后面文章的更新。

最后,周天晚8点,准备在B站直播我这段时间对比Spark和Flink一些功能时遇到的坑,如果有兴趣,欢迎来围观。


你可以添加我的私人微信,拉你入技术讨论群,跟一群热爱技术的小伙伴一起成长...



请使用浏览器的分享功能分享到微信等