Flink写ES,SQL跟DataStream API,哪个更恼火?

来源:安瑞哥是码农


上周,在添加了「执行内存」请求超时时间两个关键性参数之后,分别用 Flink 跟 Doris 自带的数据导出方式,终于把那张困扰了我许久的 Doris 大表给导出成功了。


以 Doris 自带的导出方式为例,导出到 HDFS 之后(耗时22分钟),特意瞅了一眼数据量,跟原始的 Doris 表数量一致。



说明 Doris 自带的这种导出方式,还是靠谱的,只不过呢,缺点是只能导出到几个少数指定的文件系统里


因为我的最终目的,是要把这份数据写入到 Elasticsearch (下称ES),以便后续拿它跟 Doris 做查询对比测试。


那么今天,咱就准备把这份8亿+的 HDFS 数据,尝试用 Flink 给写入到 ES 中,看整个过程又会不会出现什么幺蛾子?


这不出意外的话,意外,它就来了。


(PS:当前测试用的组件版本分别为 Hadoop3.1、Flink1.15.3,以及 Elasticsearch7.8)



0. 环境准备


用 Flink 读取 HDFS 的数据,然后再把数据写入到 ES,在我看来,这应该是一再寻常不过的业务需求,所以 Flink 必须可以办到。


打开 Flink 的官网,并没有找到直接的 HDFS connector,想要 Flink 跟 HDFS 发生关系,你得往这看:



之所以一开始选择 Table API,就想着咱能用 SQL 搞定的事情,就没有必要去写那些复杂的代码了,而一般情况下,Table API 能支持的 connector,基本上 DataStream API 也能支持(下面会展开讲)。


但是,以我多年来阅读官网的经验来看,对于这部分的描述是非常粗糙的。


首先,这个 Table API 文档,并没有像其他 connector 一样直接告诉你,读(或写) HDFS 时,你需要添加额外的 pom 依赖。


而根据躺坑经验,下面这些依赖,你必须得有:




此外,因为我写入到 HDFS 的文件格式为 CSV,而程序到时候去读取的时候,需要指定读取的 format,对于 Flink 来说,还需要额外加上下面的依赖。




至于为什么要用 provided 修饰,因为 Flink 客户端的 lib 目录里面有,编译打包的时候,没有必要打进去。


接着,因为数据最终是要写到 ES 中去的,而我当前 ES 集群的版本为7.8,刚好 Flink 目前只支持 6.x 跟 7.x 的 ES,所以还需要引入下面 ES 的依赖:




好了,必要的开发依赖添加妥当,下面就开始根据需要编码。



1. SQL API 方式


因为 SQL API 是最简单的编码方式,所以咱优先考虑用它来实现,只不过呢,编码方式简单是真的,但扯淡这件事情,它也是真的。


根据官网的要求,调试之后完整的 Flink 读 HDFS 写 ES 的代码如下:


package com.anryg.hdfs

import java.time.Duration

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
  * @DESC: Flink SQL 读取 HDFS 上8亿数据,写 ES
  * @Auther: Anryg
  * @Date: 2024/3/22 10:46
  */

object FlinkSQLReadHDFS2ES {

    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        env.enableCheckpointing(30000L)

        env.setStateBackend(new EmbeddedRocksDBStateBackend(true)) //新的设置state backend的方式
        env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkReadHDFS2ES")
        env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION//设置checkpoint记录的保留策略
        env.getCheckpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(1L))
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)

        val tableEnv = StreamTableEnvironment.create(env)

        /**创建 HDFS 数据的表映射*/
        tableEnv.executeSql(
            """
              |CREATE TABLE data_from_hdfs (
              |`client_ip` STRING,
              |domain STRING,
              |`time` STRING,
              |target_ip STRING,
              |rcode STRING,
              |query_type STRING,
              |authority_record STRING,
              |add_msg STRING,
              |dns_ip STRING
              |)
              |WITH (
              |    'connector' = 'filesystem',
              |    'path' = 'hdfs://192.168.211.106:8020/tmp/from_doris',
              |    'format' = 'csv'
              |)
            """
.stripMargin)

        /**映射要写入的ES索引*/
        tableEnv.executeSql(
            """
              |CREATE TABLE data2es (
              |`client_ip` STRING,
              |domain STRING,
              |`time` STRING,
              |target_ip STRING,
              |rcode STRING,
              |query_type STRING,
              |authority_record STRING,
              |add_msg STRING,
              |dns_ip STRING,
              |PRIMARY KEY (client_ip,domain,`time`) NOT ENFORCED //指定ES的_id
              |) WITH (
              |  'connector' = 'elasticsearch-7',
              |  'hosts' = 'http://192.168.221.173:9201',
              |  'index' = 'data_from_flink'
              |);
            """
.stripMargin)

        /**写入数据到ES*/
        tableEnv.executeSql(
            """
               |INSERT INTO data2es
               |select
               |*
               |from
               |data_from_hdfs
            """
.stripMargin)
    }
}


整个内容非常简单,也能顺利跑起来。


但是,它娘的,折腾了半天,它居然只能在本地模式运行。


但凡我把它打包,尝试放到 YARN 集群跑,它总能抛出下面的异常:



乍一看,是缺包,但其实它不缺,这个类包含的 jar 包已经打进去了。


又一看,以为是 jar 包冲突,但一排查发现,有这个类的,人家就一个 jar,想冲突都找不到对手


丫的,奇奇怪怪的问题又出现了,接下来,我又把 pom 文件的所有依赖上下打量了一番,几个有嫌疑的包我也都更新了,发现没有问题呀。


算球,先这样吧!


当前的 SQL API 在本地能跑(local模式),HDFS 上的8亿多数据也能顺利写进去,但 yarn 集群模式不行,不开心。


但如果我告诉你,换成 DataStream API,一模一样的功能,它就能行,你信不?



2. DataStream API 方式


还是参考官网,只不过呢,我依然觉得这部分的官方文档写的不咋地,从主文档的内容来看,




你是看不到什么有价值的代码信息的,需要你点开这个 format 的链接,然后跳转到对应的文件格式内容部分,这里的样例代码才有参考价值。


经过一番折腾调试之后,用 Flink Datastream API 顺利读取 HDFS 写 ES 的代码如下:


package com.anryg.hdfs

import java.time.Duration
import java.util

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.connector.sink2.SinkWriter
import org.apache.flink.connector.elasticsearch.sink.{Elasticsearch7SinkBuilderElasticsearchSinkRequestIndexer}
import org.apache.flink.connector.file.src.FileSource
import org.apache.flink.connector.file.src.reader.{StreamFormatTextLineInputFormat}
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests


/**
  * @DESC: Flink DS 读取 HDFS 上8亿数据,写 ES,这个既能本地运行,也可以提交到集群
  * @Auther: Anryg
  * @Date: 2024/3/25 10:46
  */

object FlinkDSReadHDFS2ES {

    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        env.enableCheckpointing(30000L)

        env.setStateBackend(new EmbeddedRocksDBStateBackend(true)) //新的设置state backend的方式
        env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkDSReadHDFS2ES")
        env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION//设置checkpoint记录的保留策略
        env.getCheckpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(1L))
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)

        val fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat,new Path("hdfs://192.168.211.106:8020/tmp/from_doris"))
                                    .monitorContinuously(Duration.ofSeconds(30))
                                    .build()

        import org.apache.flink.streaming.api.scala._  //引入隐私转换函数

        val fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(),"file-from-hdfs-2ES")

        val builder = new Elasticsearch7SinkBuilder()
                .setHosts(new HttpHost("192.168.221.173",9201,"http"))
                .setEmitter((element:String, context:SinkWriter.Context, indexer: RequestIndexer) =>{

                    val array = element.split(",")

                    var client_ip = ""var domain = ""var time = ""var target_ip = ""var rcode = "99"var query_type = "99"var authority_record = ""var add_msg = ""var dns_ip = ""
                    try {
                        if (array(0) != null) client_ip = array(0)
                        if (array(1) != null) domain = array(1)
                        if (array(2) != null) time = array(2)
                        if (array(3) != null) target_ip = array(3)
                        if (array(4) != null) rcode = array(4)
                        if (array(5) != null) query_type = array(5)
                        if (array(6) != null) authority_record = array(6)
                        if (array(7) != null) add_msg = array(7)
                        if (array(8) != null) dns_ip = array(8)
                    } catch {
                        case e: Exception => println(element,e)
                    }

                    val id = client_ip + domain + time /**确定唯一id*/

                    val map = new util.HashMap[String,String]() /**必须是Java的map类型*/
                    map.put("client_ip", client_ip)
                    map.put("domain", domain)
                    map.put("time", time)
                    map.put("target_ip", target_ip)
                    map.put("rcode", rcode)
                    map.put("query_type", query_type)
                    map.put("authority_record", authority_record)
                    map.put("add_msg", add_msg)
                    map.put("dns_ip", dns_ip)

                    val indexRequest = Requests.indexRequest()
                        .index("data_from_flink01"/**目标索引*/
                        .id(id) /**设置ES索引的_id字段*/
                        .source(map)

                    indexer.add(indexRequest)
                }
                ).build()

        fileDS.sinkTo(builder.asInstanceOf[ElasticsearchSink[String]])/**这里需要进行强制转换*/

        env.execute() //启动任务,action按钮
    }
}


上述代码,除了必要配置,其他一切都采用的默认的(目的是要跟下次的 Spark 做对比),相比于上面的 SQL API,这个 Datastream API 写起来难度要更大一点(官网提供的内容不能全信),但好在靠谱。


它的最大优点在于:既能在本地运行,也能提交到 yarn 集群运行


能看到下面这个UI界面,我就放心了。




因为只用了3个并行度,所以从日志信息来看,8亿多数据全部写入到 ES,从昨天的 15:48 到第二天凌晨的 6:12,花了总共14个多小时


当然,我相信,通过提高并行度,以及调整数据在写入 ES 的相关参数,肯定是可以提高写入效率的。



最后


从这次 Flink 的 SQL API 以及 Datastream API 对比来看,相同的依赖,相同的业务处理逻辑,按理说,表现应该相同才对。


但是,它偏不,总能在你不经意的时候,给你使点绊子,恶心你一下。


两套不同 API,用 Datastream 方式可以提交集群正常运行,而 SQL 方式怎么着都不行,你说是依赖问题吧,人家本地跑的好好的;你要说是集群环境问题吧,人家 Datastream 一点事没有。


上哪说理去?


基于当前场景,Datastream API 更靠谱。

请使用浏览器的分享功能分享到微信等