Spark Shuffle及其优化

一、什么是shuffle?(WHAT)

      shuffle 在 Spark 中表示 map task 和 reduce task 之间数据交换的过程。

二、为什么会产生 Shuffle?(WHY)

      shuffle 的目的就是为了对数据进行分组,把相同的 key 放在一起。

三、什么时候会发生shuffle?(WHEN)

      我们知道,Spark RDD 之间的关系分为窄依赖(父 RDD 和子 RDD 分区是一对一的关系)和宽依赖(父 RDD 和子 RDD 分区是一对多的关系)。

宽依赖也叫做 shuffle 依赖,所以说,发生宽依赖必然会发生 Shuffle,也就是宽依赖算子必然会发生 Shuffle 操作。而 shuffle 也是划分 Stage 的依据。

这些算子属于宽依赖算子:distinct、reduceByKey、groupByKey、sortByKey、repartition、repartitionAndSortWithinPartitions、coalesce(shuffle=true)、join、cogroup 等。

四、shuffle 的发生过程(HOW)

shuffle 处于两个 stage 之间,也就是处于 map task 和 reduce task 之间,起到数据交换的作用。shuffle 过程分为 shuffle write 和 shuffle read 两个过程。

1. Map阶段

从数据源读取数据,如果使用了 reduceByKey,aggregateByKey,combineBykey 算子,为了减少 shuffle 的数据量和 reduce 端的压力,map 端会进行预聚合。

2. Shuffle Write

Shuffle Write 就是生成 Shuffle 中间文件的过程。
逐条读取 map task 的数据,按照数据分区方法计算每条数据应该分发到哪个 reduce task 中,并写入到中间文件,持久化到磁盘上,持久化的目的是为了减少内存压力和容错。
在生成中间文件之前,数据会先存储在内存中,如果需要预聚合,则使用 AppendOnlyMap,否则使用 Buffer 数组。
RDD 每个 partition 会不断的向内存中的 Map 或者 buffer 中写入数据,当内存中的 AppendOnlyMap 或者 Buffer 集合超过阈值时,则将内存中的数据溢写到临时文件中,随后清空内存缓冲区,一直循环这个过程,直到数据全部写入。
所有数据写入完成后,每个 map task 会生成很多临时文件,以及内存中还有部分没有溢写的数据,为了减少文件数,会通过归并排序的方式对临时文件和内存中的数据进行合并,生成记录(Key,Value)键值对的 ".data" 数据文件和记录键值对所属 reduce task 的 ".index" 索引文件。

3. Shuffle Read

当 map task 完成之后,对于 map task 产生的 shuffle 文件,reduce task 通过网络从不同节点拉取属于自己的数据内容。

拉取的数据首先会放到内存中,然后进行数据处理,并写入相应的位置。如果 spark.shuffle.spilis=false,则处理后的数据将使用 AppendOnlyMap 仅写入内存中;否则,处理后的数据将使用 ExternalAppendOnlyMap 写入内存和磁盘。

五、什么是 ESS?(WHAT)

为了将数据计算和数据读取进行解耦,Spark 支持单独的服务来处理读取请求,即:ExternalShuffleService(ESS),这个服务需要常驻 nodemanager 节点,用来管理每个 executor 在 shuffle writer 过程中产生的中间数据。
总结起来就是:ESS 负责管理 shuffle write 生成的中间数据,其生命周期不依赖于 executor。

4. Reduce阶段

从 map 端拉取的数据做全局聚合。

六、Spark ESS

在 Spark 中,executor 进程除了运行 task,还要负责 shuffle write,以及给其他 executor 提供数据,当 executor 负载过高时,会影响任务的运行。
因此,spark 提供了 external shuffle service(ESS) 接口,比如 spark on yarn 的 YarnShuffleService。在 yarn 的 nodemanager 中会常驻一个 externalShuffleService 服务进程来为所有的 executor 服务,默认端口为7337。
在 spark 中 shuffleClient 有两种,一种是 blockTransferService,另一种是 externalShuffleClient。
如果开启了 ESS,则 externalShuffleClient 用来拉取 shuffle 数据,而 blockTransferService 用于获取 broadCast 数据。如果没有开启 ESS,spark 就只能使用自己的 blockTransferService 来拉取所有数据,包括 shuffle 数据和 broadcast 数据。
我们知道在大数据集群中,特别是集群规模比较大时,executor 挂掉是常有的事儿,如果开启了 ESS,即使 executor 挂掉,也不会影响 shuffle 数据的读取,ESS 提供了优化的数据本地性,减少了数据在网络中的传输。
总之,ESS 就是为了用来处理 shuffle 数据的。那么如何使用 ESS 呢?

1. 修改 yarn-site.xml

yarn-site.xml 中新增如下属性:


<property>
  <name>yarn.nodemanager.aux-servicesname>

  <value>spark_shufflevalue>
property>


<property>
  <name>yarn.nodemanager.aux-services.spark_shuffle.classname>
  <value>org.apache.spark.network.yarn.YarnShuffleServicevalue>
property>


<property>
  <name>spark.shuffle.service.portname>
  <value>7337value>
property>

2. 重启NodeManager

重启 NodeManager,会自动启动 ESS(External shuffle Service)。

3. 修改 spark-defaults.conf

spark.shuffle.service.enabled true
spark.shuffle.service.port 7337

八、Spark RSS

      处理 Shuffle 除了可以使用 ESS,还可以使用 RSS。

      ESS 的原理是 Map 任务在计算节点本地将相同 Partition 数据合并到一起;

      RSS 的原理是 Map 任务将相同 Partition 数据 Push 到远端的 RSS 服务,RSS服务 将同一 partition 的数据合并。

      这里就不得不提一下这个项目:Apache Celeborn,它是 RSS 的一种开源解决方案。大家可以自行去官网查看项目详情:https://celeborn.apache.org/

九、ESS 对比 RSS

      ESS 和 RSS 都是为了解决 spark shuffle 过程中大量小文件读写和大量磁盘IO和网络IO,其目的都是为了优化 shuffle 过程,提高 shuffle 的效率和可靠性。

1. ESS

      ESS 是一个在每个节点上运行的独立服务,它负责管理该节点上所有 Executor 生成的 Shuffle 数据。主要优势包括:

① 提高了可靠性,即使Executor进程因GC或其他原因挂掉,也不会影响Shuffle数据的可用性。

② 允许释放闲置的Executor以节省集群资源。

③ 支持动态资源分配,即Executor可以根据需要动态增加或减少。

当然它也有一些缺点:

① 脆弱的网络模型

② 低效的小文件IO

③ 缺乏Shuffle数据的Locality调度

2. RSS

      RSS 是一种将 Shuffle 数据写入远端服务的解决方案。它的优点主要包括:

① 存储和计算分离,使计算节点和存储节点能够各司其职。

② 支持动态资源分配,任务完成后可以立即释放资源。

③ 能够很好地集成资源调度组件,如Kubernetes。

④ 灵活性比较大,在实际应用中,各大公司可以根据实际情况定制自己的 RSS 服务。

⑤ 减少Shuffle文件数量和读写磁盘次数,以及提供数据备份和负载均衡。

总之:

      ESS 通过在每个节点上提供常驻服务来提高 Shuffle 数据的可靠性和可用性;

      RSS 通过将 Shuffle 数据的存储和计算分离,进一步提高了资源利用率和系统稳定性。

      选择使用 ESS 还是 RSS,或者它们的特定实现,通常取决于具体的使用场景和集群环境。


往期推荐

统一SQL网关:Kyuubi 使用体验

Trino 动态Catalog 体验



请使用浏览器的分享功能分享到微信等