Celeborn 真的能对 shuffle 提速吗?(上)

来源:安瑞哥是码农


也不知道是从哪听说的这个名字,可能是某篇文章,或者是群里哪个小伙伴偶然提到了这么一嘴,第一次看到时,把我愣了一下,因为之前用过一款可以用来管理和监控 Elasticsearch 的软件,叫「Cerebro」,这两长的那叫一个像啊,第一反应还以为这玩意推出新功能了呢。


定睛一看,原来是搞错了,网上一查,说 Celeborn 是一款专门为计算引擎的「shuffle」功能提速的软件,居然还可以这样。

这一下子就激发了我想测试一把的冲动。


PS:本文以 hadoop 3.1 + spark 3.2 + Celeborn0.32-incubating 进行测试验证。



0. 官网解读


找到 celeborn 的官网(不用梯子)(https://celeborn.apache.org/docs/latest/),先通篇大致瞄了一眼文档内容,描述的还算详尽,我关心的内容几乎都包括,尤其是部署部分,比较满意,但缺点就是,不支持中文。


从官网给出的功能解释来看,celeborn 优化计算引擎在处理 shuffle 操作时的原理,就是将每次 mapper 之后,在进入 reducer 分区之前的数据,原本是要在本地磁盘进行排序存储的,给更改为将这部分排序数据推送到 celeborn 集群中的磁盘


图片来自 Celeborn 官网


而在 shuffle 数据推送到 celeborn 集群的过程中,数据会提前分好区,并将不同的分区数据,推送到不同的 Celeborn 机器上(worker),这样一来,下游的 reducer 任务来取 shuffle 数据时,就能直截了当去单个的目标服务器(worker)取到目标数据,减少网络 IO量,以及网络连接数。


这,就是它能对 shuffle 提速的核心原理,你们看了之后什么感觉?


我的第一感觉就是:没有感觉,因为我不太信。


当然,这个 celeborn 还宣称对于传统的分布式计算,还有如下2点的优化:


  1. 由原来的 shuffle 数据只能落到本地磁盘,通过 celeborn 可以配置分布式文件系统,比如HDFS,实现「计算跟shuffle数据解耦」;(这有多大的必要?) 


2. 以前 shuffle 的数据因为只能存在计算节点的本地,所以没有备份,而通过 celeborn,即便你配置 shuffle 数据存储的也是本地磁盘,它也能给你整个备份。(真的需要吗?)


我之前说过,官方文档毕竟过于官方,它说啥都是好的,而且「严肃刻板」,没有温度,不像我的文章,让你乐呵呵的就把技术给学了,对吧。


一切花哨的描述在实践面前,都会显得那么渺小和幼稚,这玩意到底是不是它描述的那样好使,咱把它「跑起来看看。



1. 软件部署


想要把这个 celeborn 耍起来,是需要一定成本的,它是一个分布式的「服务式」集群,因此需要先部署,部署方式类似我们的 HDFS。


是一个典型的 master/slaver 架构,其中 master 可以部署多个,来满足 HA,而 slaver 则是用来管理计算引擎产生 shuffle 数据的。


找到它的下载地址:https://celeborn.apache.org/download/


我这里选择下载它的最新版本安装包:


从这个截图提供的信息来看,它还是一个身处娘胎里的宝宝(孵化中),心里不免对这个软件宽容了一点。


下载到服务器之后,解完压缩包,它长这样:


从文件夹中就可以看出来,人家是明确支持 Spark 跟 Flink 的。


进一步查看这个文件夹里面的 jar 包情况可以知道,Celeborn 现在支持 spark2.x 跟 spark3.x,以及 Flink 1.14、1.15、1.17(居然少了个1.16)。


解压完之后,下一步就是要对其配置,目前来看,还没有个分布式服务能脱离这一步的。


进入到conf目录,需要修改下面2个配置文件:


1. celeborn-env.sh : (需要你从 celeborn-env.sh.template 这个模板文件复制过来),这个文件主要用来配置 master 节点以及 worker 节点的内存,玩法跟 hadoop-env.sh 是一样的;


我的集群配置


2. celeborn-defaults.conf: (同样需要你从 celeborn-defaults.conf.template 这个模板文件复制过来),这里主要配置 master 的服务器地址、端口,以及 master 的HA(也可以不用)、worker 的数据存储目录。


这里有个地方可能需要注意一下的是,在官网给的模板里:


它把本地磁盘的配置跟 HDFS 的配置写在了一起,但你要知道,在你的真实环境里,这里选择一个就可以了。


我当前集群情况比较特殊,因为 yarn 的机器分布,跟 HDFS 的机器分布情况并不完全重叠,为了达到更好的测试效果,我这里 Celeborn 的 worker 节点,为了选择跟 yarn 集群完全重合的机器,所以存储位置配置的是本地磁盘方式。




值得注意的是,虽然官方文档没有明确说明 Celeborn 可以配合哪些任务运行方式(local、yarn、standalone)进行使用。


但从我的实际验证来看,以上3种,它应该都可以(已经验证了 loacal 跟 yarn 模式),具体如何在开发环境中配置,详见下文。

配置还算简单,只需要在一台服务器上,把以上这些配置都配置好,然后将他们分发到你的目标机器上。


最后,在不同的机器上,通过启动不同的脚本,来启动对应的 master 以及 worker 服务。


根据我的经验,一般建议你在 spark 任务提交的客户端节点,来启动 Celeborn 的 master,而在 yarn 服务所在的节点,来启动 worker 服务


至于为什么?数据放在离计算最近的位置(计算跟存储同集群),不管磁盘 IO 还是网络 IO,它的成本一定是最低的


选定好机器,把软件包分发到各个机器之后,部署的最后一步就是启动服务了(不需要做 ssh 免密之类的乱七八糟操作)。


在 master 节点,启动 master 脚本:



启动之后,我这里并没有出现官网描述的那样,有对应的日志内容输出,因为日志文件里面什么玩意都没有,空的。


但是别急,只要日志文件里没有报错内容产生,那它就是启动成功了。


然后,在各个 worker 节点启动 worker脚本:


官网在描述启动 worker 脚本时,有些地方说需要在后面加上 master 的 IP 和端口的参数,经过验证,其实是不需要的,因为配置文件里已经有了呀,在启动命令里再写一遍,不是画蛇添足嘛。


同样,启动这个 worker 进程后,如果正常,日志文件也是什么内容都没有,主打一个节省存储空间。


至此,Celeborn 这个分布式服务的基础部署工作,全部完成。



2. 开发环境准备


接下来,就是准备开发环境,老规矩,虽然 celeborn 支持多种计算引擎,但咱今天还是以 Spark 为例,进行验证测试。


原因很简单,通过用 spark 的批处理模式,来对一个大数据集进行计算,通过观察其利用 Celeborn 加速时需要的计算时间,以及不用 Celeborn 计算时需要的时间,两者一对比,就能直观的看出 Celeborn 在其中所起的作用


不利用 Celeborn 的 spark 程序好说,现在,咱就来看看对于一个需要利用 Celeborn 的 spark 程序,应该怎么在你的 IDEA 工具里配置开发环境。


你的 Spark 程序想要跟 Celeborn 发生关系,要跟他进行通信,必要的配置,或者依赖 jar 包是不是就得安排上。


这个能调通,直接利用 Celeborn 的样例代码长这样:


package com.anryg.bigdata.celeborn

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
  * @DESC: 利用 Celeborn 开发的 Spark 任务
  * @Auther: Anryg
  * @Date: 2024/1/23 16:00
  */

object SparkWithCeleborn {

    def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("SparkWithCeleborn")/*.setMaster("local[*]")*/

        if (args(1) == "1") celebornConf(sparkConf) //控制是否打开Celeborn的开关

        val spark = SparkSession.builder().config(sparkConf).getOrCreate()

        val rawDF = spark.read.option("header",true).csv(args(0)) /**批处理方式读取数据源*/
        
        rawDF.createOrReplaceTempView("t"/**创建表名*/

        spark.sql(args(2)).show(args(3).toInt, false/**根据不同的SQL进行计算,然后将计算结果打印输出*/
    }

    /**配置 Celebron 相关配置*/
    private def celebornConf(sparkConf: SparkConf): Unit ={
        sparkConf.set("spark.shuffle.manager""org.apache.spark.shuffle.celeborn.SparkShuffleManager")
        sparkConf.set("spark.serializer""org.apache.spark.serializer.KryoSerializer")
        sparkConf.set("spark.celeborn.master.endpoints""192.168.211.106:9097")
        sparkConf.set("spark.shuffle.service.enabled""false")
        sparkConf.set("spark.celeborn.client.push.replicate.enabled""true")
        sparkConf.set("spark.sql.adaptive.localShuffleReader.enabled""false")
        //for aqe
        sparkConf.set("spark.sql.adaptive.enabled""true")
        sparkConf.set("spark.sql.adaptive.skewJoin.enabled""true")
    }
}

看着好像是那么回事,但直接运行是不行的:



很明显还缺 jar 包,缺谁呢?


缺这个:


其实这个包,就是一开始 Celeborn 家目录下,spark 子目录下的这个包:


只不过,我现在需要在开发环境的 pom 文件里,要把它引入过来,以调通我 local 模式下的运行。


于是,需要往现有的 pom 文件中,引入以下依赖:


至此,基于 Celeborn 的 spark 开发环境就算配置好了。



3. 小试牛刀


Celeborn 服务部署好了,对应的 spark 程序也准备好了,万事俱备,就差一跑。


因为只是最基础的测试功能是否可行,以及代码运行是否通畅,所以这次「处女跑」,就直接在 IDEA 上进行了。


我找了个 3G 的数据集,跑了个简单的聚合(肯定是带shuffle的),通过启动 Celeborn 功能后,可以看到日志的输出多了以下内容:



从日志中可以看出来,它把 shuffle 数据写入到了哪些机器上,以及哪个机器是 primary,哪个机器是 replica (shuffle数据的主备)。


同时,从 Celeborn 的 worker 节点 shuffle 数据目录来看,确实写入数据了:


为了截这个图(取其中的一个数据目录),眼睛都给我瞪疼了,因为这个 shuffle 过程一结束,这个数据就被删了,主打一个转瞬即逝。


在 IDEA 的本地环境里,通过启用 Celeborn 跑完一个简单的聚合任务,所花时间为:81秒


然后,我又用同样的任务,在不启用 Celeborn 的情况下,跑完程序所花时间为:84秒


但是,仅凭这3秒的差距,就能说明 Celeborn 能起到真正加速的目的吗?


后续我又反复测试了几次,发现每次启用 Celeborn ,任务跑完所花的时间确实要比不启用 Celeborn 快一点,但也就10秒之内的差距,我觉得这暂时还不能说明什么问题。



最后


从这次对 Celeborn 的初步体验来看,首先,官网对它的部署,和必要配置描述的比较清楚,所以它的服务部署部分还挺顺利,过程几乎没有坑。


其次,是对于开发环境的准备,虽然官方文档没有给出直接的步骤,但是一些必要信息都可以从中找到,所以过程也很顺利。


但是对于 Celeborn 能对一个带 shuffle 过程的 spark 任务能带来多少加速提升,目前仍然需要打个问号?


如果仅凭上面这个简单的对比,就能说明 Celeborn 确实对 spark 任务的 shuffle 有多少优化作用,显然有点站不住脚。


 但今天这篇文章的篇幅有点过长了,写到这就已经把我累够呛。



请使用浏览器的分享功能分享到微信等