将kafka中的流式数据通过Flink,经过简单的清洗转换,最后写到HDFS文件系统,并将其保存为CSV(普通文本)格式。

因为你一旦设置了Hadoop的环境变量,那么就相当于把Hadoop相关的配置都告诉了当前用户,这里当然就包括yarn的集群地址。因此当该用户向yarn提交flink任务时,自然而然就提交成功。

下载的flink版本

添加Hadoop环境变量

flink on yarn的提交命令

flink on yarn提交任务后的输出

创建kafka source table 写hdfs文件

package com.anryg
import java.time.Duration
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* @DESC: 读取kafka数据,从DataStream到HDFS
* @Auther: Anryg
* @Date: 2022/8/14 19:08
*/
object FlinkDSFromKafka2HDFS {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment //获取流任务的环境变量
val kafkaSource = KafkaSource.builder() //获取kafka数据源
.setBootstrapServers("${kafka_node}:6667")
.setTopics("your_topic")
.setGroupId("FlinkDSFromKafka2HDFS2")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
import org.apache.flink.streaming.api.scala._ //引入隐私转换函数
val kafkaDS = env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"kafka-data") //读取数据源生成DataStream对象
val targetDS = kafkaDS.map(line => { //对数据源做简单的ETL处理
line.split("\\|")
}).filter(_.length == 9).map(array => (array(0),array(1),array(2),array(3),array(4),array(5),array(6),array(7),array(8)))
/**基于flink1.14之后新的,文件系统的sink策略,跟官网提供的不一致,有坑*/
val hdfsSink2 = StreamingFileSink.forRowFormat(new Path("hdfs://${namenode}:8020/tmp/flink_sink2"),
new SimpleStringEncoder[(String,String,String,String,String,String,String,String,String)]("UTF-8"))
//.withBucketAssigner(new DateTimeBucketAssigner) /**默认基于时间分配器*/
.withRollingPolicy( //设置文件的滚动策略,也就是分文件策略,也可以同时设置文件的命名规则,这里暂时用默认
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofSeconds(300)) //文件滚动间隔,设为5分钟,即每5分钟生成一个新文件
.withInactivityInterval(Duration.ofSeconds(20)) //空闲间隔时间,也就是当前文件有多久没有写入数据,则进行滚动
.withMaxPartSize(MemorySize.ofMebiBytes(500)) //单个文件的最大文件大小,设置为500MB
.build()).build()
targetDS.addSink(hdfsSink2) //目标DataStream添加sink策略
env.execute("FlinkDSFromKafka2HDFS") //启动任务
}
}
flink的新版本(具体版本不记得了),已经不再跟Hadoop绑定(目前我用的1.15.3就是),这样就导致当你把API写好,往HDFS写数据时,运行时发现,缺少了Hadoop的相关依赖,于是必须给加上,但是错误提示并不告诉你缺少了哪些,官网也没有说,你只能一个个来试; -
按理说讲flink处理好的数据写入到HDFS,这个非常常用的文件系统应该是非常简单的事情,可是从官方文档来看,情况并不是这样的,我如果告诉你官方文档提供的API是错的,你信吗? 官方文档提供的API是这样的: 
但是,1.15.2这个版本根本就没有这个类,然后我去maven中央仓库去查了下,发现这个类对应的jar包最高的版本也只到1.14.6。

所以,只能去各种查资料,最后才知道,原来新版本的flink对于文件系统的Sink方式改成了用 StreamingFileSink 这个新类,坑爹的玩意,可是它在官网压根就没有提及,你只能各种找,然后凭经验去试错,才能最终把问题解决。 还有在查相关资料的时候,你会发现Flink的API特别的乱,同样都是写数据到HDFS,flink不同版本之间的实现方式都不一样,关键变化还不是一点点,如果之前的老版本,比如1.10左右的,想升级到比较新的版本,你会发现之前写的api完全都不能用,必须全部重新换。
4. 跟Structured streaming的对比
因为之前这个需求就是用spark的structured streaming来实现的,记得第一次实现的时候,就很顺利,每一个步骤官方文档都有示例代码,几乎没什么坑。
因为spark跟Hadoop是天然集成的,因此在spark代码中你只要指定一个文件路径,它就默认是HDFS,因此这个需求用spark来完成基本上是分分钟就写出来了。
但是我用flink,花了至少半天时间,倒不是因为它难,而是在找资料的过程中,会有太多干扰你的东西,这个不对,那个也不行,你都不知道最后能相信谁,只能挨个去试错。
而这,就是用新技术必须要付出的代价,而这些坑,我几年前可没少趟...
Flink虽然有很多吐槽的地方,但是优点,咱还是要来夸一夸的:
默认情况下,任务运行时,控制台的输出非常的干净,除了用户代码的显示打印外,几乎没有任何多余的输出信息,给人的感觉非常的清爽,当然,它带来的一个坏处就是你不能清楚直观的看到任务的执行过程,以及每个过程都经历了什么;
数据经过shuffle操作后,会自动对任务的分区根据key进行调整,比如原来有10个分区,聚合后变成了3个key,那么partition数也自动变成了3个,而spark是不变的,除非人工去调整;
-
任务界面:各有利弊,Flink的显示界面默认每3秒刷新一次,而且可以非常清楚的看到运行的JVM的内存变化,可以及时的预判是否会发生OOM,这一点比spark做的好。
但是Flink只能显示当前累计处理的数量大小,不能显示条数,以及不能显示数据的读取速度和数据处理速度,而这些spark的structured streaming是都可以的:
Flink只能显示累计处理的数据量大小
Spark能显示数据源读取的速率以及处理的速率
此外,spark还拥有比较丰富、直观的对数据处理的图表分析功能:

而这是Flink目前还不具备的,至少在flink on yarn里面没有这个功能。
