Spark 对比 Flink 写 Elasticsearch,谁更牛逼?

来源:安瑞哥是码农


前两天咱验证了用Flink 读 HDFS 的8亿+数据写 Elasticsearch(下称ES),分别用它的 SQL API 以及 DataStream API 做了测试对比。


Flink写ES,SQL跟DataStream API,哪个更恼火?


发现, 虽然在本地 IDEA 环境调试时,都能够顺利跑通,但是在相同的集群环境下,SQL API 是不能正常提交到集群运行的。


而对于 Flink 的 DataStream API,在集群环境下,用了3个并行,其他参数全都默认情况下,将 HDFS 的8亿+数据全部写入到 ES ,耗时14个多小时


那么今天,咱把 Flink 换成 Spark,看对于完成同样的一件事情,对比这两者会有哪些方面的区别?


(PS:本次测试的组件环境为,HDFS3.1、Spark3.2、ES7.8)



0. 环境准备


从 Spark 的官网我们可以知道,Spark 既可以通过普通的「批处理」方式来读取 HDFS 写 ES,也可以用「流计算」的方式读 HDFS 写 ES。


为此,我准备写两套代码,一起来测试这两种方式之间的区别,以及跟之前 Flink 的区别。


在编码前,开发环境需要添加下面的必要依赖:


这里需要注意的是,由于 Maven 中央仓库没有找到跟我当前 ES 集群一样的版本(7.8),只能找了一个最近的(7.12),但好在完全兼容。


而对于 HDFS 的相关依赖来说,因为 Spark 天然是支持 hadoop 生态的,自然在 spark 的核心包里面就已经包括了,所以这里不用额外引入。


开发环境准备完毕,下一步,开始撸代码。



1. Spark Batch API 的实现


一般来说,用 Spark 读取 HDFS,大家最容易想到,也是最简单的,就是用这种批处理的方式。


对于用 Spark Batch API 读取 HDFS 写 ES,调试之后可正常运行的代码如下:


package com.anryg.bigdata.hdfs

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveModeSparkSession}

/**
  * @DESC: Spark 以批的方式读取 HDFS 中的8亿数据,写入到 ES 中
  * @Auther: Anryg
  * @Date: 2024/03/28 15:10
  */

object SparkReadHDFS2ES {
    def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("ReadHDFS2ES")/*.setMaster("local[*]")*/
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()

        import spark.implicits._  /**引入隐式转换对象*/

        val rawDF = spark.read.option("header",false)
                .csv("hdfs://192.168.211.106:8020/tmp/from_doris")

        val targetDS = rawDF.map(row => {
            var rcode = "99"
            var query_type = "99"
            val client_ip = row.getString(0)
            val domain = row.getString(1)
            val time = row.getString(2)
            val target_ip = row.getString(3)
            try {
                if (row.getString(4).length == 1) rcode = row.getString(4)
            } catch {
                case e:Exception => println(row,e) //捕获异常数据异常情况,防止进程因此退出
            }
            try {
                if (!row.getString(5).equals("") && !row.getString(5).contains("N")) query_type = row.getString(5)
            } catch {
                case e:Exception => println(row,e)
            }
            val authority_record = row.getString(6)
            val add_msg = row.getString(7)
            val dns_ip = row.getString(8)

            val id = client_ip + domain + time //确定唯一id
            (id, client_ip, domain, time, target_ip, rcode, query_type, authority_record, add_msg, dns_ip)
        }).toDF("_id""client_ip""domain""time""target_ip""rcode""query_type""authority_record""add_msg""dns_ip")

        /**批方式写入*/
        targetDS.write
            .format("org.elasticsearch.spark.sql")  //指定外部输出为ES
            .mode(SaveMode.Append//必须指定
            .option("es.nodes","192.168.221.173")
            .option("es.port","9201")
            .option("es.write.operation","upsert")/**指定当前模式必须指定id*/
            .option("es.mapping.id","_id"//指定索引的_id
            .option("es.mapping.exclude","_id"//唯一id不作为列写入
            .save("data_from_spark")
    }
}

代码非常简单,有兴趣的同学,可以对比一下跟上一篇 Flink 读 HDFS 写 ES 在风格上的区别。



2. Spark Streaming API 的实现


相比 Spark 的 batch API,它的 structed streaming API 会略显复杂一点点。


同样,被我调试后能正常运行的代码如下:


package com.anryg.bigdata.hdfs

import java.util.concurrent.TimeUnit

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.StructType

/**
  * @DESC: Spark 以流的方式读取 HDFS 中的8亿数据,写入到 ES 中
  * @Auther: Anryg
  * @Date: 2024/03/28 16:28
  */

object SparkReadHDFS2ESByStream {
    def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("ReadHDFS2ES")/*.setMaster("local[*]")*/

        val spark = SparkSession.builder().config(sparkConf).getOrCreate()

        import spark.implicits._  /**引入隐式转换对象*/

            /**流方式读取时,必须要额外指定schema*/
        val schema = new StructType()
            .add("client_ip""string")
            .add("domain""string")
            .add("time""string")
            .add("target_ip""string")
            .add("rcode""string")
            .add("query_type""string")
            .add("authority_record""string")
            .add("add_msg""string")
            .add("dns_ip""string")

            /**以流的方式读取HDFS*/
        val rawDF = spark.readStream.option("header",false)
                .schema(schema)/**必须添加schema*/
                .csv("hdfs://192.168.211.106:8020/tmp/from_doris")

        val targetDS = rawDF.map(row => {
            var rcode = "99"
            var query_type = "99"
            val client_ip = row.getString(0)
            val domain = row.getString(1)
            val time = row.getString(2)
            val target_ip = row.getString(3)
            try {
                if (row.getString(4).length == 1) rcode = row.getString(4)
            } catch {
                case e:Exception => println(row,e) //捕获异常数据异常情况,防止进程因此退出
            }
            try {
                if (!row.getString(5).equals("") && !row.getString(5).contains("N")) query_type = row.getString(5)
            } catch {
                case e:Exception => println(row,e)
            }
            val authority_record = row.getString(6)
            val add_msg = row.getString(7)
            val dns_ip = row.getString(8)

            val id = client_ip + domain + time //确定唯一id
            (id, client_ip, domain, time, target_ip, rcode, query_type, authority_record, add_msg, dns_ip)
        }).toDF("_id""client_ip""domain""time""target_ip""rcode""query_type""authority_record""add_msg""dns_ip")

        /**以流的方式写入ES*/
        targetDS.writeStream
            .format("org.elasticsearch.spark.sql")  //指定外部输出为ES
            .option("es.nodes","192.168.221.173")
            .option("es.port","9201")
            .option("es.write.operation","upsert"/**指定当前模式必须指定id*/
            .option("es.mapping.id","_id"//指定索引的_id
            .option("es.mapping.exclude","_id"//唯一id不作为列写入
            .option("checkpointLocation","hdfs://192.168.211.106:8020/tmp/commit_log/SparkReadHDFS2ESByStream"//用来保存ES commit log,用的流方式必须指定
            .start("data_from_spark01")
            .awaitTermination()
    }
}


相比上面的 batch API,用 streaming 的方式,除了 API 的写法上,会略有不同外,核心区别有下面2点:


1. 数据源必须指定 schema


2. 数据在 sink 到 ES 时,必须指定 checkpoint


而这个 checkpoint,具体指的是提交给 ES 的 commit log,目的是为了任务中途挂掉重启时,不会重复读取之前处理过的数据。



3. 运行状态对比


为了能比较清楚的展现这两种 API 的区别,我把这个批处理任务,跟流处理任务同时都给跑起来,并同时写入到2个不同的 ES 索引里。


并且,这两个任务都采用跟上次 Flink 读 HDFS 写 ES 一毛一样的集群环境、并行度,以及内存和CPU配置,公平得厉害。


从这两个 Spark 的运行任务来看,首先,它们的 DAG 有些不一样:


 

批处理模式的DAG



流处理模式的DAG

可以看到,批处理模型的 DAG,要比流模型下步骤更多,更复杂一点。


其次,它们在运行中,和完成时的效率,也略有差异。


处理模式运行中的状态

处理模式运行中的状态


处理运行完成的状态

处理模式完成的状态


从这两者运行完成花费的时间来看,流处理模式下的效率要更快那么一neinei。



4. Flink 跟 Spark 效率对比


最终,在我一天一夜蹲守下,这两货终于跑完了,那叫一个慢啊。


对于这次将8亿+的 HDFS 数据全部写入到 ES,Flink 跟 Spark 的执行时间如下:


1. Flink 总耗时:14小时24分


2. Spark batch 总耗时:17小时54分


3. Spark streaming 总耗时:17小时48分


从对比结果来看,基于当前场景,Flink 的执行效率,明显要比 Spark 高一些


但是,从我对这几种不同 API 的代码调试过程,所花费的时间来看,Spark 的效率则更高一些,坑也相对更少一点


从这个对比结论来看,如果是同样的场景,你会选谁?



(PS:本次代码已经同步更新到gitee)

请使用浏览器的分享功能分享到微信等