大数据实战项目——Flink案例遇到的坑


最近在用Flink和Spark的Structured Streaming基于同一个需求场景,做了一个对比测试,目的在于看这两种不同技术实现的流处理手段,在我们的实际开发场景中,哪个更加便捷和好用?

其实在我经历的这么多大大小小的大数据项目中,生产环境中我一直用的都是Spark,基于Spark生态,你能叫出名字的我基本上都用了个遍,而且很多功能都是深度使用的。

所以对于分布式计算引擎来说,个人认为,能够把Spark玩转,一定能满足你各种不同场景下的计算需求,如果不能,那只能说明你还不够熟。

当然,这里的玩转,需要你对其各个模块的功能、原理,底层实现都非常熟悉才可以,只有这样,当你面对棘手问题时,才能有足够的信心和底气去搞定它们。

熟悉我的小伙伴都知道,我一向对那些比较新的、广告打的特别多的技术有种本能的警惕,因为这一般就意味着该技术不够成熟、不够稳定、生态不够健全等等诸多弊病。

而Flink则是这其中的一位,它以更先进和更合理的流处理思想而闻名,那么今天咱就以一个非常简单、实用的场景来聊聊Flink的解决方案。

场景如下:

将kafka中的流式数据通过Flink,经过简单的清洗转换,最后写到HDFS文件系统,并将其保存为CSV(普通文本)格式。

够简单对不对,为了你更加清晰这个流程,咱简单画个图:


下面我将完成这件事情的具体步骤,记录如下:


1. 部署Flink on yarn
这个过程其实特别简单,只需要将flink官网提供的压缩包下载下来,然后放到一台你想提交flink任务的机器上就可以,也叫flink客户端机器。

为什么这么做,原因也很简单,因为原本我们有现成的大数据集群,而且我们当前的集群分布式任务都是通过yarn来管理的,目前已经管理了MR、Tez和Spark任务。

那突然加入一个Flink,当然也不会去搞特殊,也是为了避免增加系统的管理复杂度,同样想到用YARN来管理Flink任务。

跟Spark性质一样,但凡计算引擎,本质上就是一个不需要启动任何服务的SDK,你只需要找到一个能提交该任务的机器作为客户端就可以了,然后在提交Flink任务的时候,让它知道Hadoop的环境变量就可以,其他所有配置都不需要做任何更改。

因为你一旦设置了Hadoop的环境变量,那么就相当于把Hadoop相关的配置都告诉了当前用户,这里当然就包括yarn的集群地址。因此当该用户向yarn提交flink任务时,自然而然就提交成功。

事实证明确实如此,你只需要把在官网下载的压缩包,放到合适的目录下进行解压(我下载的次新版1.15.3),然后添加对应的用户,这里我用flink用户。

下载的flink版本

接着,在flink用户的环境配置文件里添加Hadoop的环境变量,这个flink部署就已经完成了。

添加Hadoop环境变量

是不是so easy?

部署完成后,那为了证明我们的部署是OK的,一定要跑个demo试试看,正好官网也给我们提供好了案例。

在当前机器的flink家目录下用flink用户运行如下命令:

flink on yarn的提交命令

当你看到如下输出的时候,证明你的flink on yarn环境已经部署好了:

flink on yarn提交任务后的输出



2. Flink table API编码部分

部署flink环境确实很简单,但是接下来就是坑比较多的地方了。

首先是flink读取kafka,我最开始是想着用flink的Table API来处理,于是根据官方文档的要求,该引入的jar包也都引入了,也创建了kafka的source table,同时又创建了HDFS的Sink table,如下面的过程:

创建kafka source table 写hdfs文件


但是,运行报错了:

大概查了下资料,没有很快找到解决办法,算了,放弃,证明当前方式可能不好用,后面再找个专门时间去折腾。


3. Flink DataStream API编码部分

果断选择用DataStream API来实现,虽然也有很多坑,但是好在最终调试通过,目前任务提交到yarn上运行了几个小时,一切正常。

具体代码代码如下,写的比较粗糙,仅提供你参考:
package com.anryg

import java.time.Duration

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.{SimpleStringEncoderSimpleStringSchema}
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment


/**
  * @DESC: 读取kafka数据,从DataStream到HDFS
  * @Auther: Anryg
  * @Date: 2022/8/14 19:08
  */

object FlinkDSFromKafka2HDFS {

    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment //获取流任务的环境变量

        val kafkaSource = KafkaSource.builder()  //获取kafka数据源
                .setBootstrapServers("${kafka_node}:6667")
                .setTopics("your_topic")
                .setGroupId("FlinkDSFromKafka2HDFS2")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build()

        import org.apache.flink.streaming.api.scala._  //引入隐私转换函数
        val kafkaDS = env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"kafka-data"//读取数据源生成DataStream对象

        val targetDS = kafkaDS.map(line => { //对数据源做简单的ETL处理
            line.split("\\|")
        }).filter(_.length == 9).map(array => (array(0),array(1),array(2),array(3),array(4),array(5),array(6),array(7),array(8)))

        /**基于flink1.14之后新的,文件系统的sink策略,跟官网提供的不一致,有坑*/
        val hdfsSink2 = StreamingFileSink.forRowFormat(new Path("hdfs://${namenode}:8020/tmp/flink_sink2"),
            new SimpleStringEncoder[(String,String,String,String,String,String,String,String,String)]("UTF-8"))
                //.withBucketAssigner(new DateTimeBucketAssigner) /**默认基于时间分配器*/
                .withRollingPolicy( //设置文件的滚动策略,也就是分文件策略,也可以同时设置文件的命名规则,这里暂时用默认
            DefaultRollingPolicy.builder()
                    .withRolloverInterval(Duration.ofSeconds(300)) //文件滚动间隔,设为5分钟,即每5分钟生成一个新文件
                    .withInactivityInterval(Duration.ofSeconds(20)) //空闲间隔时间,也就是当前文件有多久没有写入数据,则进行滚动
                    .withMaxPartSize(MemorySize.ofMebiBytes(500)) //单个文件的最大文件大小,设置为500MB
                    .build()).build()

        targetDS.addSink(hdfsSink2) //目标DataStream添加sink策略

        env.execute("FlinkDSFromKafka2HDFS"//启动任务
    }
}
具体的代码已经上传到GitHub,有需要的小伙伴可自行下载。

GitHub地址:https://github.com/Anryg/internet_behavior_project

虽然是一个非常简单的需求,但是如果你是第一次玩,一定会遇到非常多的坑,下面就来给你细数一下,我现在还能想起来的坑有哪些:

  1. flink的新版本(具体版本不记得),已经不再跟Hadoop绑定(目前我用的1.15.3就是),这样就导致当你把API写好,往HDFS写数据时,运行时发现,缺少了Hadoop的相关依赖,于是必须给加上,但是错误提示并不告诉你缺少了哪些,官网也没有说,你只能一个个来试;

  2.  按理说讲flink处理好的数据写入到HDFS,这个非常常用的文件系统应该是非常简单的事情,可是从官方文档来看,情况并不是这样的,我如果告诉你官方文档提供的API是错的,你信吗?

    官方文档提供的API是这样的:



    但是,1.15.2这个版本根本就没有这个类,然后我去maven中央仓库去查了下,发现这个类对应的jar包最高的版本也只到1.14.6。



    所以,只能去各种查资料,最后才知道,原来新版本的flink对于文件系统的Sink方式改成了用 StreamingFileSink 这个新类,坑爹的玩意,可是它在官网压根就没有提及,你只能各种找,然后凭经验去试错,才能最终把问题解决。

  3. 还有在查相关资料的时候,你会发现Flink的API特别的乱,同样都是写数据到HDFS,flink不同版本之间的实现方式都不一样,关键变化还不是一点点,如果之前的老版本,比如1.10左右的,想升级到比较新的版本,你会发现之前写的api完全都不能用,必须全部重新换。


4. 跟Structured streaming的对比


因为之前这个需求就是用spark的structured streaming来实现的,记得第一次实现的时候,就很顺利,每一个步骤官方文档都有示例代码,几乎没什么坑。


因为spark跟Hadoop是天然集成的,因此在spark代码中你只要指定一个文件路径,它就默认是HDFS,因此这个需求用spark来完成基本上是分分钟就写出来了。


但是我用flink,花了至少半天时间,倒不是因为它难,而是在找资料的过程中,会有太多干扰你的东西,这个不对,那个也不行,你都不知道最后能相信谁,只能挨个去试错。


而这,就是用新技术必须要付出的代价,而这些坑,我几年前可没少趟...


Flink虽然有很多吐槽的地方,但是优点,咱还是要来夸一夸的:


  1.  默认情况下,任务运行时,控制台的输出非常的干净,除了用户代码的显示打印外,几乎没有任何多余的输出信息,给人的感觉非常的清爽,当然,它带来的一个坏处就是你不能清楚直观的看到任务的执行过程,以及每个过程都经历了什么;

  2.  数据经过shuffle操作后,会自动对任务的分区根据key进行调整,比如原来有10个分区,聚合后变成了3个key,那么partition数也自动变成了3个,而spark是不变的,除非人工去调整;

  3.  任务界面:各有利弊,Flink的显示界面默认每3秒刷新一次,而且可以非常清楚的看运行的JVM的内存变化,可以及时的预判是否会发生OOM,这一点比spark做的好。

    但是Flink只能显示当前累计处理的数量大小,不能显示条数,以及不能显示数据的读取速度和数据处理速度,而这些spark的structured streaming是都可以的:



    Flink只能显示累计处理的数据量大小

    Spark能显示数据源读取的速率以及处理的速率


此外,spark还拥有比较丰富、直观的对数据处理的图表分析功能:


而这是Flink目前还不具备的,至少在flink on yarn里面没有这个功能。



最后
技术没有绝对的好坏,只有具体场景下基于成本考量下的取舍。

以上,就是我针对这个小小应用场景下对这两个技术的一次简单对比,后续我会用更多生产案例来对flink进行测试。

如果你有任何相关技术问题,欢迎找我交流...


你可以添加我的私人微信,拉你入技术讨论群,跟一群热爱技术的小伙伴一起成长...



请使用浏览器的分享功能分享到微信等