来源:安瑞哥是码农
前两天咱验证了用Flink 读 HDFS 的8亿+数据写 Elasticsearch(下称ES),分别用它的 SQL API 以及 DataStream API 做了测试对比。
发现, 虽然在本地 IDEA 环境调试时,都能够顺利跑通,但是在相同的集群环境下,SQL API 是不能正常提交到集群运行的。
而对于 Flink 的 DataStream API,在集群环境下,用了3个并行,其他参数全都默认情况下,将 HDFS 的8亿+数据全部写入到 ES ,耗时14个多小时。
那么今天,咱把 Flink 换成 Spark,看对于完成同样的一件事情,对比这两者会有哪些方面的区别?
(PS:本次测试的组件环境为,HDFS3.1、Spark3.2、ES7.8)
0. 环境准备
从 Spark 的官网我们可以知道,Spark 既可以通过普通的「批处理」方式来读取 HDFS 写 ES,也可以用「流计算」的方式读 HDFS 写 ES。
为此,我准备写两套代码,一起来测试这两种方式之间的区别,以及跟之前 Flink 的区别。
在编码前,开发环境需要添加下面的必要依赖:
这里需要注意的是,由于 Maven 中央仓库没有找到跟我当前 ES 集群一样的版本(7.8),只能找了一个最近的(7.12),但好在完全兼容。
而对于 HDFS 的相关依赖来说,因为 Spark 天然是支持 hadoop 生态的,自然在 spark 的核心包里面就已经包括了,所以这里不用额外引入。
开发环境准备完毕,下一步,开始撸代码。
1. Spark Batch API 的实现
一般来说,用 Spark 读取 HDFS,大家最容易想到,也是最简单的,就是用这种批处理的方式。
对于用 Spark Batch API 读取 HDFS 写 ES,调试之后可正常运行的代码如下:
package com.anryg.bigdata.hdfs
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* @DESC: Spark 以批的方式读取 HDFS 中的8亿数据,写入到 ES 中
* @Auther: Anryg
* @Date: 2024/03/28 15:10
*/
object SparkReadHDFS2ES {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ReadHDFS2ES")/*.setMaster("local[*]")*/
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._ /**引入隐式转换对象*/
val rawDF = spark.read.option("header",false)
.csv("hdfs://192.168.211.106:8020/tmp/from_doris")
val targetDS = rawDF.map(row => {
var rcode = "99"
var query_type = "99"
val client_ip = row.getString(0)
val domain = row.getString(1)
val time = row.getString(2)
val target_ip = row.getString(3)
try {
if (row.getString(4).length == 1) rcode = row.getString(4)
} catch {
case e:Exception => println(row,e) //捕获异常数据异常情况,防止进程因此退出
}
try {
if (!row.getString(5).equals("") && !row.getString(5).contains("N")) query_type = row.getString(5)
} catch {
case e:Exception => println(row,e)
}
val authority_record = row.getString(6)
val add_msg = row.getString(7)
val dns_ip = row.getString(8)
val id = client_ip + domain + time //确定唯一id
(id, client_ip, domain, time, target_ip, rcode, query_type, authority_record, add_msg, dns_ip)
}).toDF("_id", "client_ip", "domain", "time", "target_ip", "rcode", "query_type", "authority_record", "add_msg", "dns_ip")
/**批方式写入*/
targetDS.write
.format("org.elasticsearch.spark.sql") //指定外部输出为ES
.mode(SaveMode.Append) //必须指定
.option("es.nodes","192.168.221.173")
.option("es.port","9201")
.option("es.write.operation","upsert")/**指定当前模式必须指定id*/
.option("es.mapping.id","_id") //指定索引的_id
.option("es.mapping.exclude","_id") //唯一id不作为列写入
.save("data_from_spark")
}
}
代码非常简单,有兴趣的同学,可以对比一下跟上一篇 Flink 读 HDFS 写 ES 在风格上的区别。
2. Spark Streaming API 的实现
相比 Spark 的 batch API,它的 structed streaming API 会略显复杂一点点。
同样,被我调试后能正常运行的代码如下:
package com.anryg.bigdata.hdfs
import java.util.concurrent.TimeUnit
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.StructType
/**
* @DESC: Spark 以流的方式读取 HDFS 中的8亿数据,写入到 ES 中
* @Auther: Anryg
* @Date: 2024/03/28 16:28
*/
object SparkReadHDFS2ESByStream {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ReadHDFS2ES")/*.setMaster("local[*]")*/
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._ /**引入隐式转换对象*/
/**流方式读取时,必须要额外指定schema*/
val schema = new StructType()
.add("client_ip", "string")
.add("domain", "string")
.add("time", "string")
.add("target_ip", "string")
.add("rcode", "string")
.add("query_type", "string")
.add("authority_record", "string")
.add("add_msg", "string")
.add("dns_ip", "string")
/**以流的方式读取HDFS*/
val rawDF = spark.readStream.option("header",false)
.schema(schema)/**必须添加schema*/
.csv("hdfs://192.168.211.106:8020/tmp/from_doris")
val targetDS = rawDF.map(row => {
var rcode = "99"
var query_type = "99"
val client_ip = row.getString(0)
val domain = row.getString(1)
val time = row.getString(2)
val target_ip = row.getString(3)
try {
if (row.getString(4).length == 1) rcode = row.getString(4)
} catch {
case e:Exception => println(row,e) //捕获异常数据异常情况,防止进程因此退出
}
try {
if (!row.getString(5).equals("") && !row.getString(5).contains("N")) query_type = row.getString(5)
} catch {
case e:Exception => println(row,e)
}
val authority_record = row.getString(6)
val add_msg = row.getString(7)
val dns_ip = row.getString(8)
val id = client_ip + domain + time //确定唯一id
(id, client_ip, domain, time, target_ip, rcode, query_type, authority_record, add_msg, dns_ip)
}).toDF("_id", "client_ip", "domain", "time", "target_ip", "rcode", "query_type", "authority_record", "add_msg", "dns_ip")
/**以流的方式写入ES*/
targetDS.writeStream
.format("org.elasticsearch.spark.sql") //指定外部输出为ES
.option("es.nodes","192.168.221.173")
.option("es.port","9201")
.option("es.write.operation","upsert") /**指定当前模式必须指定id*/
.option("es.mapping.id","_id") //指定索引的_id
.option("es.mapping.exclude","_id") //唯一id不作为列写入
.option("checkpointLocation","hdfs://192.168.211.106:8020/tmp/commit_log/SparkReadHDFS2ESByStream") //用来保存ES commit log,用的流方式必须指定
.start("data_from_spark01")
.awaitTermination()
}
}
相比上面的 batch API,用 streaming 的方式,除了 API 的写法上,会略有不同外,核心区别有下面2点:
1. 数据源必须指定 schema
2. 数据在 sink 到 ES 时,必须指定 checkpoint
而这个 checkpoint,具体指的是提交给 ES 的 commit log,目的是为了任务中途挂掉重启时,不会重复读取之前处理过的数据。
3. 运行状态对比
为了能比较清楚的展现这两种 API 的区别,我把这个批处理任务,跟流处理任务同时都给跑起来,并同时写入到2个不同的 ES 索引里。
并且,这两个任务都采用跟上次 Flink 读 HDFS 写 ES 一毛一样的集群环境、并行度,以及内存和CPU配置,公平得厉害。
从这两个 Spark 的运行任务来看,首先,它们的 DAG 有些不一样:
批处理模式的DAG
流处理模式的DAG
可以看到,批处理模型的 DAG,要比流模型下步骤更多,更复杂一点。
其次,它们在运行中,和完成时的效率,也略有差异。
批处理模式运行中的状态
流处理模式运行中的状态
批处理运行完成的状态
流处理模式完成的状态
从这两者运行完成花费的时间来看,流处理模式下的效率要更快那么一neinei。
4. Flink 跟 Spark 效率对比
最终,在我一天一夜地蹲守下,这两货终于跑完了,那叫一个慢啊。
对于这次将8亿+的 HDFS 数据全部写入到 ES,Flink 跟 Spark 的执行时间如下:
1. Flink 总耗时:14小时24分;
2. Spark batch 总耗时:17小时54分;
3. Spark streaming 总耗时:17小时48分。
从对比结果来看,基于当前场景,Flink 的执行效率,明显要比 Spark 高一些。
但是,从我对这几种不同 API 的代码调试过程,所花费的时间来看,Spark 的效率则更高一些,坑也相对更少一点。
从这个对比结论来看,如果是同样的场景,你会选谁?
(PS:本次代码已经同步更新到gitee)