上次Flink的坑,这么填


上周五的文章中写到Flink1.15.3版本中,尝试通过Table API (非SQL API)来实现流式数据的 window 和 watermark 功能,但是没有成功。

老实说因为这个心结,后面连着几天都不太舒服,也许这就是程序员的倔强与执着吧。

后面我又想了多个办法,虽然最终问题给解决了,但是解决方法我却并不十分满意。


SQL API的解决方案

因为不得不用我不太喜欢的 SQL API,好像目前只有这个解决办法,通过 Table API 想了几个办法都没有搞定(这里感谢一位球友的友情提示),参考代码如下:
package com.anryg.window_and_watermark

import org.apache.flink.configuration.Configuration
import org.apache.flink.table.api.{EnvironmentSettingsTableEnvironment}


/**
  * @DESC: 用SQL API读取kafka数据,并利用watermark和window功能来对数据进行统计
  * @Auther: Anryg
  * @Date: 2022/8/14 19:08
  */

object FlinkTBFromKafkaWithWatermarkAndWindow {

    def main(args: Array[String]): Unit = {
        val streamingSetting = EnvironmentSettings.newInstance().inStreamingMode().build()

        val config = new Configuration() //设置checkpoint
        config.setString("execution.checkpointing.interval","10000")
        config.setString("state.backend""filesystem")
        config.setString("state.checkpoints.dir","hdfs://192.168.211.106:8020/tmp/checkpoint/FlinkTBFromKafkaWithWatermarkAndWindow")

        streamingSetting.getConfiguration.addAll(config)

        val tableEnv = TableEnvironment.create(streamingSetting)

        tableEnv.executeSql(
            """
              |Create table kafkaTable(
              |`client_ip` STRING,
              |`domain` STRING,
              |`time` STRING,
              |`process_time` TIMESTAMP(3) METADATA FROM 'timestamp', //获取kafka中元数据的时间
              |`target_ip` STRING,
              |`rcode` STRING,
              |`query_type` STRING,
              |`authority_record` STRING,
              |`add_msg` STRING,
              |`dns_ip` STRING,
              |watermark for process_time as process_time - interval '10' second  //设置watermark,确定watermark字段
              |)
              |with(
              |'connector' = 'kafka',
              |'topic' = 'qianxin',
              |'properties.bootstrap.servers' = '192.168.211.107:6667',
              |'properties.group.id' = 'FlinkTBFromKafkaWithWatermarkAndWindow',
              |'scan.startup.mode' = 'latest-offset',
              |'value.format'='csv',                                 //确定数据源为文本格式
              |'value.csv.field-delimiter'='|'                      //确定文本数据源的分隔符
              |)
            """
.stripMargin)

        tableEnv.executeSql(
            """
              |SELECT
              |window_start,
              |window_end,
              |client_ip,
              |count(client_ip) as ip_count
              |FROM TABLE(
              |HOP(                       //确定window策略
              |TABLE kafkaTable,
              |DESCRIPTOR(process_time),
              |INTERVAL '30' SECONDS,   //确定滑动周期
              |INTERVAL '2' MINUTES)    //确定窗口时间间隔
              |)
              |GROUP BY
              |window_start,
              |window_end,
              |client_ip
              |ORDER BY ip_count
              |DESC
              |LIMIT 10
            """
.stripMargin
        ).print()

    }
}
看着确实够简洁的,整个数据的获取和处理过程全部用SQL来完成。

但是我却并不喜欢用这种方式来写,原因如下:

  1. 确实够简洁了,开发者几乎只需要写纯SQL就可以解决数据从数据源、到数据计算、最后到数据sink的整个数据处理链路(这也是Flink鼓吹的一个优势)。

    但是,这样一来就削弱了开发者对计算框架本身原理的理解和相关知识点的掌握,让自己的技能容易变得单一;

  2.  纯SQL的代码在书写时,需要开发者对其SQL语法非常的熟练,在编写过程中要求一气呵成,中间如果有任何语法错误,或者关键词拼写错误,一般的IDE工具都无法进行及时的提示纠正,比较难受;

  3. 纯SQL的代码在运行时比较难以debug,一旦报错,报错原因以及提示的错误位置让人比较难以精确定位,因为程序在实际执行的时候,会将SQL语句生成对应的语法树,然后根据语法树的先后顺序来执行。

    因此实际代码执行的顺序逻辑跟你的SQL是不一致的,导致排查问题的效率会变低。

因此,相比之下我更愿意用算子 API 的方式来编写,其主要优点如下:

  1.  在用算子编写代码时,因为不同的算子代表了不同的功能,你可以清晰的知道每一个算子的用法,输入和输出,可以随时查看其源码搞清楚它的原理和用法;

  2. 在用算子编写代码时,IDE工具都都有非常强大的提示功能,你可以非常清楚的知道当前对象可以调用哪些算子,以及每个算子的返回对象,方便你在编码时选择;

  3. 在代码运行出错时,错误的栈提示也会相对比较清晰,一般都可以定位到具体哪个算子出的错,因为程序的执行先后逻辑基本上就是你算子书写的先后逻辑。

说回刚才的代码,即便是这么写,还是遇到了些坑,比如上面这个代码,我在IDEA上直接运行,是没有任何结果输出的,但它也不报错。

这里排除了业务逻辑、语法逻辑、数据源、硬件资源、以及网络状况的原因。

怀疑就是Flink的bug,或者我本地IEDA环境的问题(但又不像),因为当我把当前代码打包放到yarn集群运行时,又能够正常输出我想要的结果:


但为什么我在本地IDEA运行就跑不出来呢?

但是当我把其中的 window_start 或者 window_end 这个字段去掉,它又能跑出结果了:


上面这些坑,我在周末B站直播 Spark VS Flink 功能的时候,给大家详细演示过,有兴趣的小伙伴可以去看看我B站上的直播回放视频。


最后

以上,是我在初用flink时,实实在在遇到的一些坑,相信那些正在学习用flink开发的小伙伴,一定也会碰到。

大家平时可能会看到各种宣传Flink的技术大会,把这玩意夸的神乎其神,好像玩大数据,如果不用它就不配似的。

但是那些技术大会上,分享(吹嘘) Flink 使用案例的大神们,几乎都不会告诉你,他们在使用过程中,那些让他们痛彻心扉的坎,以及在克服过程中的曲折与酸爽。

任何技术,其初衷是用来解决实际生产问题的,哪种稳定、实用、高效,我们就用哪种,而不是一味的去求新,追潮流。



你可以添加我的私人微信,拉你入技术讨论群,跟一群热爱技术的小伙伴一起成长...



请使用浏览器的分享功能分享到微信等