你的Spark程序变慢了,十有八九是出现了数据倾斜。可见数据倾斜是Spark性能挑战越不过去的坎。
那么什么是数据倾斜、导致数据倾斜的原因以及如何克服它以保持 Spark 应用程序的最佳性能呢?
什么是数据倾斜?
Apache Spark 中的数据倾斜指的是,正在处理的数据其在不同分区之间分布不均匀的情况。在理想的情况下,数据应该均匀分布在所有分区上,以确保最大的并行性,从而提高处理速度。然而,现实的情况是数据通常并不完全平衡,当一个或几个分区的数据量与其他分区相比不成比例时,就会出现数据倾斜的情况。
这种数据不平衡会极大地影响 Spark 应用程序的性能,导致处理时间更长、资源使用效率低下,甚至出现内存不足错误。在最坏的情况下,具有倾斜数据的单个分区可能会减慢整个 Spark 作业的速度,因为 Spark 中某个阶段的总体完成时间是由最慢的任务所花费的时间决定的。
数据倾斜的原因
Spark 应用程序中数据倾斜的潜在原因有以下几个:
-
倾斜的数据分布
现实世界的数据通常分布不均匀。某些键值可能会出现大量出现(热键),从而导致数据在分区之间分布不均。
-
分区策略不充分
Spark 中的默认分区策略可能并不总是对于您的特定数据集最有效。例如,如果某些键散列到同一分区,则默认的散列分区策略可能会导致数据倾斜。
-
Join连接操作
执行连接操作时,如果被连接的数据集中的键分布不均匀,可能会导致倾斜。在大型数据集与小型数据集通过非唯一键连接的情况下,这一点尤其突出。
-
GroupBy 操作
与连接操作类似,当某些键比其他键具有更多值时,GroupBy 操作也会导致数据倾斜。
如何处理 Spark 中的数据倾斜?
尽管数据倾斜会显着影响 Spark 应用程序的性能,但有多种策略可以帮助管理和缓解此问题:
-
自定义分区
实施自定义分区策略可以帮助在分区之间更均匀地分布数据,而不是依赖 Spark 的默认分区策略。例如,在处理数字键时,范围分区可能更有效。
-
加盐
加盐是一种将随机值(盐)附加到主键key的技术,这有助于在分区之间更均匀地分布数据。这在处理热键时特别有用。
-
动态分区修剪
动态分区修剪是 Spark 中使用的一种技术,通过跳过两个数据集中不相关分区的扫描来优化连接操作。这有助于在连接操作导致数据倾斜的情况下提高性能。
-
分割倾斜数据
另一种策略是将倾斜数据分割到多个分区。这涉及识别倾斜的主键并重新分配与这些主键相关的数据。
-
避免对大型数据集使用 GroupBy
如果可能,请避免对具有非唯一键的大型数据集使用 GroupBy 操作。诸如reduceByKey之类的替代方案(在执行分组操作之前在每个分区上本地执行组合操作)可能会更高效。

另外,在Spark AQE中有着Join时的自动倾斜处理的rule OptimizeSkewedJoin,感兴趣的可以参考文章[SPARK][SQL] 面试问题之Spark AQE新特性。
实现数据倾斜的处理
以下是一些关于如何处理 Apache Spark 中数据倾斜的处理示例:
1. 加盐技术
当数据中的某些键(热键)出现大量次数而导致偏斜时,加盐是一种有用的技术。这个想法是向主键key添加一个随机数(盐),以便具有相同主键key的数据record中现在分布在多个分区中。
让我们看看如何使用 Scala 在 Spark 中应用加盐:
```markdown
val Spark: SparkSession = ...
// 假设我们有一个存在数据倾斜的 DataFrame
val skewedDF: DataFrame = ...
// 定义一个 UDF 添加 salt
val addSaltUdf = udf((key: String) => key + "_" + Random.nextInt( 100 ))
// 将 salt 添加到 key
val saltedDF = skewedDF.withColumn( "key" , addSaltUdf(col( "key" ) )))
// 现在您可以对 saltedDF 执行操作,而不会出现数据倾斜问题
```
2. 自定义分区
如果 Spark 中的默认分区策略不能均匀分布您的数据,您可以考虑实施自定义分区策略。让我们举一个使用范围分区的例子:
```markdown
Val Spark:SparkSession = ...
//假设我们有一个带有数据倾斜的RDD
val skewedrdd:rdd [(int,string)] = ... //自定义
范围分区器
val val custompartitioner = new langepartitioner = new Rangepartitioner(10,skewedrdd)无数据倾斜问题的DD
```
3. 分割倾斜数据
如果可以识别倾斜的键,则可以将倾斜的数据和非倾斜的数据拆分为两个单独的 RDD/DataFrame 并分别处理它们。
```markdown
val Spark: SparkSession = ...
// 假设我们有一个存在数据倾斜的 DataFrame
val skewedDF: DataFrame = ...
// 倾斜的键
val skewedKeys = Seq( "key1" , "key2" , "key3" )
// 分割数据
val skewedData = skewedDF.filter(col( "key" ).isin(skewedKeys: _*))
val nonSkewedData = s kewedDF.filter(!col( "key" ).isin(skewedKeys: _*))
// 分别处理 skewedData 和 nonSkewedData
```
不过请记住,处理数据偏斜的正确策略取决于数据和应用程序的具体情况。正确应用这些策略可以显着提高 Spark 应用程序的性能。
总结
总之,数据倾斜是 Apache Spark 中处理大数据时的一个常见问题。了解导致该问题的原因以及如何有效处理它可以极大地提高 Spark 应用程序的性能和稳定性。将这些策略保留在您的工具包中,以便下次在 处理 Spark 任务中面对数据倾斜挑战时使用。