从kafka读取数据源,实时统计时间窗口为2分钟的,排名前十的上网用户的数量(客户端IP数量),滑动窗口为30秒(即分析结果每30s更新一次),且数据容忍的迟到时间为10s,然后把结果输出到控制台。
看着是不是也挺简单的,其实就是一个基于时间滑动窗口worldcount,但是也别小瞧这个案例,生产上很多时候就是面对这样的一些需求,但是想做好,也没有想象中那么简单。
简单分析一下这个需求:
既然是统计client_ip基于时间的数量,那么就只需要取client_ip以及上网时间(time)这两个字段就可以了,然后基于时间窗口进行聚合就能得到结果。
下面分别从Spark的structured streaming以及Flink的API来实现:
1.Spark的实现
先看Structuredstreaming的实现,参考代码如下:
package com.anryg.bigdata.streaming.demo.window_watermark
import java.sql.Timestamp
import java.text.SimpleDateFormat
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
/**
* @DESC: 用时间窗口和watermark来进行client_ip的worldcount统计
* @Auther: Anryg
* @Date: 2022/11/30 10:04
*/
object WorldCountWithWatermark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WorldCountWithWatermark").setMaster("local")
val spark = SparkSession.builder()
.config(conf)
.getOrCreate()
Logger.getLogger("org.apache").setLevel(Level.WARN) //减少INFO日志的输出
val rawDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "${kafka_broker}:6667")
.option("subscribe", "${topic}")
//.option("group.id","test9999") /**不再用该方式来绑定offset,而是每个程序有个唯一的id,该id跟checkpointLocation绑定,虽然group.id属性在运行中依然保留,但是不再跟offset绑定*/
.option("failOnDataLoss",false)
.option("fetchOffset.numRetries",3)
//.option("maxOffsetsPerTrigger",Integer.MAX_VALUE)/**用于限流,限定每个批次取的数据条数,确定写入HDFS单个文件的条数*/
.option("startingOffsets","latest")
.load()
import spark.implicits._
val df1 = rawDF.selectExpr("CAST(value AS string)")
.map(row =>{
val line = row.getAs[String]("value")
val fieldArray:Array[String] = line.split("\\|")
fieldArray
})
.filter(_.length == 9) //确定字段数必须为9个
.filter(_(1).endsWith("com")) //防止数量太大,对访问的网站做的一点限制
.map(array =>{
val sdf = new SimpleDateFormat("yyyyMMddhhmmss").parse(array(2))
val time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(sdf)
(array(0), Timestamp.valueOf(time)) //因为time这个字段要作为watermark字段,它必须是yyyy-MM-dd HH:mm:ss样式的Timestamp类型
})
.toDF("client_ip", "time") //添加schema
import org.apache.spark.sql.functions._ /**引入spark内置函数*/
val df2 = df1.withWatermark("time", "10 seconds") //一般需要跟window一起配合使用
.groupBy(window($"time","2 minutes","30 seconds"), $"client_ip") //确定具体字段,以及对应的聚合时间窗口,和滑动窗口
.count()
.orderBy($"count".desc)
.limit(10)
val query = df2.writeStream
.format("console") //打印到控制台
.option("truncate", false) //将结果的内容完整输出,默认会砍掉内容过长的部分
.option("numRows",30) //一次最多打印多少行,默认20行
.option("checkpointLocation","hdfs://${namenode}/tmp/offset/WorldCountWithWatermark") //确定checkpoint目录
//.outputMode(OutputMode.Update())//不支持排序的结果
.outputMode(OutputMode.Complete()) //确定输出模式,默认为Append
.start()
query.awaitTermination()
}
}

package com.anryg.window_and_watermark
import java.text.SimpleDateFormat
import java.time.Duration
import java.util.Locale
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, SlidingProcessingTimeWindows, TumblingEventTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time
import org.apache.flink.streaming.api.windowing.time.Time
/**
* @DESC: 读取kafka数据,从DataStream到HDFS
* @Auther: Anryg
* @Date: 2022/8/14 19:08
*/
object FlinkDSFromKafkaWithWatermark {
private final val hdfsPrefix = "hdfs://${namenode}:8020"
def main(args: Array[String]): Unit = {
//获取流任务的环境变量
val env = StreamExecutionEnvironment.getExecutionEnvironment
.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE) //打开checkpoint功能
env.getCheckpointConfig.setCheckpointStorage(hdfsPrefix + "/tmp/flink_checkpoint/FlinkDSFromKafkaWithWatermark") //设置checkpoint的hdfs目录
env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //设置checkpoint记录的保留策略
val kafkaSource = KafkaSource.builder() //获取kafka数据源
.setBootstrapServers("${kafka_broker}:6667")
.setTopics("${topic}")
.setGroupId("FlinkDSFromKafkaWithWatermark")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
import org.apache.flink.streaming.api.scala._ //引入隐私转换函数
val kafkaDS = env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks()
,"kafka-data") //读取数据源生成DataStream对象
val targetDS = kafkaDS.map(line => { //对数据源做简单的ETL处理
line.split("\\|")
}).filter(_.length == 9).filter(_(1).endsWith("com"))
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofHours(10)) //指定watermark
.withTimestampAssigner(new SerializableTimestampAssigner[Array[String]] {
override def extractTimestamp(element: Array[String], recordTimestamp: Long): Long = {
val sdf = new SimpleDateFormat("yyyyMMddhhmmss")
sdf.parse(element(2)).getTime //指定的watermark字段必须是Long类型的时间戳
}
}))
.map(array => (array(0), 1))
.keyBy(kv => kv._1) //根据client_ip聚合
.window(SlidingProcessingTimeWindows.of(Time.minutes(2), Time.seconds(30))) //指定window,这里的window assigner必须是基于Process Time而不是Event Time,因为数据时间跟当前真实时间相差有点多
.sum(1)
targetDS.print() //打印结果
env.execute("FlinkDSFromKafkaWithWatermark") //启动任务
}
}

package com.anryg.window_and_watermark
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
/**
* @DESC: 读取kafka数据,从DataStream API转为Table API,并利用watermark
* @Auther: Anryg
* @Date: 2022/8/14 19:08
*/
object FlinkTBFromKafkaWithWatermark {
private final val hdfsPrefix = "hdfs://192.168.211.106:8020"//HDFS地址前缀
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment //获取流环境变量
.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE) //打开checkpoint功能
val tableEnv = StreamTableEnvironment.create(env) //创建Table环境变量
env.getCheckpointConfig.setCheckpointStorage(hdfsPrefix + "/tmp/flink_checkpoint/FlinkDSFromKafkaWithWatermark") //设置checkpoint的hdfs目录
env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //设置checkpoint记录的保留策略
val kafkaSource = KafkaSource.builder()
.setBootstrapServers("192.168.211.107:6667")
.setTopics("qianxin")
.setGroupId("FlinkTBFromKafkaWithWatermark")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
val kafkaDS = env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"kafka-data")
val targetDS = kafkaDS.map(_.split("\\|"))
.filter(_.length == 9)
.filter(_(1).endsWith("com"))
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)) //给业务字段分配watermark
.withTimestampAssigner(new SerializableTimestampAssigner[Array[String]] {
override def extractTimestamp(element: Array[String], recordTimestamp: Long): Long = { //实现watermark字段的分配
val sdf = new SimpleDateFormat("yyyyMMddhhmmss")
sdf.parse(element(2)).getTime
}
}))
.map(array => (array(0), array(2)))
.map(kv => {
val date = kv._2
val sdf = new SimpleDateFormat("yyyyMMddhhmmss").parse(date)
val time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(sdf)
(kv._1, Timestamp.valueOf(time)) //将时间转为要求的Time Attributes 也就是Timestamp类型
})
import org.apache.flink.table.api._ //加入隐式转换,否则下面的$无法识别
val targetTable = tableEnv.fromDataStream(targetDS)
.as("client_ip", "time") //添加schema
.window(
Slide over 1.minute every 30.seconds() on $"time" as $"w" //加入window
)
.groupBy($"client_ip", $"w")
.select(
$"client_ip",
$"w".start(), //时间窗口的开始时间
$"w".end(), //时间窗口的解释时间
$"client_ip".count() as "count"
)
.orderBy($"count")
.limit(10)
targetTable.execute().print()
}
}



