Flink SQL的另一个小坑坑


本来想着对Flink的基础使用部分讲解就此结束,但是今天在我决定对之前的SQL API根据实际业务情况做一番改动的时候。

咦... 又出幺蛾子了。

关键这个幺蛾子非常的隐蔽,想从官方文档和源码中寻找思路几乎不可能(因为我翻了好久没能找到),而想从现有的一些博客或者中文资料中找线索更是没门。

到底是什么问题呢?

就是之前根据 watermark 和滑动时间窗口来实时计算排名前十的上网IP(client_IP)数量,但是当时为了快速达到验证 API 的效果,就把数据源写入到 kafka 的元数据时间(process_time),给定义为watermarkwindows需要的事件时间

准确来说,这么做是背离业务需求的,因为如果你根据数据进入到kafka的时间来作为event_time,那么你处理的数据永远都是“准时”到达的,无法反映业务的真实状态。

于是,基于这个考虑,我决定把真实的数据时间作为 watermark 和 windows 需要的事件时间


1. 修改之后,有问题的代码

参考代码如下:
package com.anryg.window_and_watermark

import org.apache.flink.configuration.Configuration
import org.apache.flink.table.api.{EnvironmentSettingsTableEnvironment}


/**
  * @DESC: 用SQL API读取kafka数据,并利用watermark和window功能来对数据进行统计
  * @Auther: Anryg
  * @Date: 2022/8/14 19:08
  */

object FlinkSQLFromKafkaWithWatermarkAndWindow {

    def main(args: Array[String]): Unit = {
        val streamingSetting = EnvironmentSettings.newInstance().inStreamingMode().build()

        val config = new Configuration() //设置checkpoint
        config.setString("execution.checkpointing.interval","10000")
        config.setString("state.backend""filesystem")
        config.setString("state.checkpoints.dir","${name_node}:8020/tmp/checkpoint/FlinkTBFromKafkaWithWatermarkAndWindow")

        streamingSetting.getConfiguration.addAll(config)

        val tableEnv = TableEnvironment.create(streamingSetting)

        tableEnv.executeSql(
            """
              |Create table kafkaTable(
              |client_ip STRING,
              |domain STRING,
              |time STRING,
              |target_ip STRING,
              |rcode STRING,
              |query_type STRING,
              |authority_record STRING,
              |add_msg STRING,
              |dns_ip STRING,
              |event_time AS to_timestamp(time, 'yyyyMMddHHmmss'), //设置事件时间为实际数据的产生时间
              |watermark for event_time as event_time - interval '10' second  //设置watermark,确定watermark字段
              |)
              |with(
              |'connector' = 'kafka',
              |'topic' = '${topic}',
              |'properties.bootstrap.servers' = '${kafka_broker}:6667',
              |'properties.group.id' = 'FlinkTBFromKafkaWithWatermarkAndWindow',
              |'scan.startup.mode' = 'latest-offset',
              |'value.format'='csv',                                 //确定数据源为文本格式
              |'value.csv.field-delimiter'='|'                      //确定文本数据源的分隔符
              |)
            "
"".stripMargin)

        tableEnv.executeSql(
            """
              |SELECT
              |window_start,
              |window_end,
              |client_ip,
              |count(client_ip) as ip_count
              |FROM TABLE(
              |HOP(                       //确定window策略
              |TABLE kafkaTable,
              |DESCRIPTOR(event_time),
              |INTERVAL '30' SECONDS,   //确定滑动周期
              |INTERVAL '2' MINUTES)    //确定窗口时间间隔
              |)
              |GROUP BY
              |window_start,
              |window_end,
              |client_ip
              |ORDER BY ip_count
              |DESC
              |LIMIT 10
            "
"".stripMargin
        ).print()

    }
}

以上写法,相对于之前的代码(上次Flink的坑,这么填),唯一的改变就是将原来的:
`process_time` TIMESTAMP(3METADATA FROM 'timestamp',
改成了:
event_time AS to_timestamp(time, 'yyyyMMddHHmmss'),
为什么这么改?

原因很简单,因为原本数据源的时间字段样式为:20221212185236,考虑要将其转化为 Flink timestamp 类型,而time 则是真实的数据事件时间字段。

参考了官方文档,它是这么写的:


再结合 to_timestamp 这个函数的用法(同样查看官方文档说明):


所以我可以肯定,上面这个时间字段转换的写法一定没有问题。

但是你猜怎么着?还是报错了。

既然语法没有问题,从报错的提示来看,像是某种书写格式没有符合要求,但是又没有给出具体的信息。

于是,我只能凭经验各种尝试和猜测...


2. 问题解决之后的代码

最终,让我给试出来了,最后能成功运行的代码如下:
package com.anryg.window_and_watermark

import org.apache.flink.configuration.Configuration
import org.apache.flink.table.api.{EnvironmentSettingsTableEnvironment}


/**
  * @DESC: 用SQL API读取kafka数据,并利用watermark和window功能来对数据进行统计
  * @Auther: Anryg
  * @Date: 2022/8/14 19:08
  */

object FlinkSQLFromKafkaWithWatermarkAndWindow {

    def main(args: Array[String]): Unit = {
        val streamingSetting = EnvironmentSettings.newInstance().inStreamingMode().build()

        val config = new Configuration() //设置checkpoint
        config.setString("execution.checkpointing.interval","10000")
        config.setString("state.backend""filesystem")
        config.setString("state.checkpoints.dir","hdfs://192.168.211.106:8020/tmp/checkpoint/FlinkSQLFromKafkaWithWatermarkAndWindow")

        streamingSetting.getConfiguration.addAll(config)

        val tableEnv = TableEnvironment.create(streamingSetting)

        tableEnv.executeSql(
            """
              |Create table kafkaTable(
              |client_ip STRING,
              |domain STRING,
              |`time` STRING,
              |target_ip STRING,
              |rcode STRING,
              |query_type STRING,
              |authority_record STRING,
              |add_msg STRING,
              |dns_ip STRING,
              |event_time AS to_timestamp(`time`, 'yyyyMMddHHmmss'), //设置事件时间为实际数据的产生时间,注意time这个字段必须要用``符合括起来
              |watermark for event_time as event_time - interval '10' second  //设置watermark,确定watermark字段
              |)
              |with(
              |'connector' = 'kafka',
              |'topic' = 'qianxin',
              |'properties.bootstrap.servers' = '192.168.211.107:6667',
              |'properties.group.id' = 'FlinkSQLFromKafkaWithWatermarkAndWindow',
              |'scan.startup.mode' = 'latest-offset',
              |'value.format'='csv',                                 //确定数据源为文本格式
              |'value.csv.field-delimiter'='|'                      //确定文本数据源的分隔符
              |)
            "
"".stripMargin)

        tableEnv.executeSql(
            """
              |SELECT
              |window_start,
              |window_end,
              |client_ip,
              |count(client_ip) as ip_count
              |FROM TABLE(
              |HOP(                       //确定window策略
              |TABLE kafkaTable,
              |DESCRIPTOR(event_time),
              |INTERVAL '30' SECONDS,   //确定滑动周期
              |INTERVAL '2' MINUTES)    //确定窗口时间间隔
              |)
              |GROUP BY
              |window_start,
              |window_end,
              |client_ip
              |ORDER BY ip_count
              |DESC
              |LIMIT 10
            "
"".stripMargin
        ).print()

    }
}
你观察到区别了吗?

就是原本代码中:
  event_time AS to_timestamp(time, 'yyyyMMddHHmmss'),
给改成:
  event_time AS to_timestamp(`time`, 'yyyyMMddHHmmss'),
以及,将最开始表字段定义部分的:
  time STRING,
给改为:
  `time` STRING,
问题就解决了,是不是还挺隐蔽的呢。

注意:这里加在 time 字段上面的两个小点点,不是单引号,而是 TAB 键上面的小尖号,切记!

但是我翻了好久的官方文档,楞是没有找到类似的样例说明,只能硬试。

这一点,也更加佐证了我之前说的,我不太喜欢用纯SQL API的原因,因为很多时候它的错误提示实在是太隐晦和抽象了,很难一下子定位到问题的症结。


最后

相比SparkFlink的算子API(Table API)和SQL API无法在同一个Context 下共存,不知道你有没有发现这个问题。

也就是,你要么全部用纯SQL来实现你的整个数据处理流程,从数据源读取、到数据计算、再到最后的数据结果Sink一条龙,中间步骤不可以用普通算子,如调用 selectgroupBy 等函数操作。

Spark 则可以在这两者之间穿插,混合使用,比如你上一个步骤用纯SQL 处理的一个 DataFrame 对象,下一步就可以对这个 DataFrame 结果调用各种算子操作,个人认为会更加灵活便捷一些。

以上参考代码,已经更新到GitHubhttps://github.com/Anryg/internet_behavior_project


你可以添加我的私人微信,拉你入技术讨论群,跟一群热爱技术的小伙伴一起成长...



请使用浏览器的分享功能分享到微信等