Apache Spark:我是否应该重分区?

同样的Spark任务,分区不同,最终的任务执行时效也是相差很大。

如果分区数太小,则并行度较低,它可能会导致严重的 GC 问题或 OOM。此外,如果某些任务处理的数据比其他任务多得多(即数据倾斜),或者由于磁盘较旧、CPU 频率较低等原因运行时间较长,整个集群的资源利用率就会降低,当前阶段的执行时间会更长。这时候就需要通过增加分区数,减少某个分区处理的数据量,可以减轻这类问题的影响。

当然,如果分区值太大,可能会增加调度的开销,因为会有太多的小reduce任务和许多小文件生成,也会导致性能问题。

可以看出,在分布式环境中,正确的数据分布是提升任务性能的关键。

在Spark 的DataFrame API中,有一个函数repartition()可以控制Spark集群上的数据分布。然而,想要有效的使用该函数并不简单,这是因为更改数据分区会与集群节点上物理数据移Shuffle的成本有关。所以我们一般的经验是,使用重新分区repartition的成本很高,因为它会引起Shuffle。

在本文中,我们将进一步了解在某些情况下,在正确的位置添加一个 Shuffle 会删除另外两个 Shuffle,从而使整体执行更加高效。我们将先介绍一些理论,以了解 Spark SQL 内部如何利用有关数据分布的信息,然后我们再介绍一些使用重新分区变得有用的实际例子。

1. Query Planning (查询计划)

在Spark SQL中允许用户使用高阶的DataFrame API来编写可执行代码,Spark DataFrame API提供了创建算子,转换算子,控制算子,行动算子等丰富的编程函数。

用户可以首先通过创建算子定义DataFrame,然后通过转换算子transformation对数据进行处理和操作,其中这些转换是惰性的,这意味着它们不会立即执行,而是在幕后转换为查询计划。

当用户调用一个行动算子Action操作时,查询计划将被具体化。这些行动算子Action一般是我们要求某些输出的函数,例如当我们将转换结果保存到某个存储系统。

查询计划本身可以有两种主要类型:逻辑计划和物理计划。相应地,查询计划处理的步骤可以分为逻辑计划部分和物理计划部分。

2. Logical Plan (逻辑计划)

逻辑计划本身只是查询计划的抽象表示,把它转换为树的形式,其中树中的每个节点都是关系运算符,这是Spark对运算的抽象表示,方便执行时的计算。

逻辑计划本身不携带有关执行或用于计算连接Join或聚合Agg等转换的算法的任何特定信息。它只是以一种便于优化的方式表示查询中的信息。

在逻辑规划期间,查询计划由 Spark 优化器进行优化,该优化器会应用一组优化的规则。这些规则主要基于启发式,例如,最好首先过滤数据,然后进行其他处理等等。

3. Physical Plan (物理计划)

一旦逻辑规划得到优化,物理规划就开始了。此阶段的目的是获取逻辑计划并将其转换为可以执行的物理计划。与非常抽象的逻辑计划不同,物理计划在执行细节方面更加具体,因为它包含执行期间将使用的算法的具体选择。

物理计划由两个步骤组成:sparkPlan 和 executedPlan。

spark plan 是使用所谓的strategies创建的,strategies是Spark中一系列的逻辑计划转换为物理计划的策略,在执行时会遍历整个逻辑计划树中的每个节点node,每个节点node都会经过strategies转换为Spark 计划中的一个或多个运算符。

strategies的一个示例是JoinSelection,它决定 Spark 使用哪种Join算法来连接数据。

我们可以使用 Scala API 来查看Spark Plan的表示形式:

df.queryExecution.sparkPlan // 在 Scala 中

生成sparkPlan后,将应用一组附加规则来创建物理计划的最终版本,即executedPlan。然后执行这个executedPlan来生成RDD代码。

我们可以简单直接地在 DataFrame 上调用explain来查看这个executedPlan*,因为它实际上是物理计划的最终版本。当然,我们也可以转到 Spark UI 查看其图形表示。

4. 与分区相关的规则与策略

4.1 ER 规则(EnsureRequirements)

ER 规则,*EnsureRequirements,是用于将Spark Plan转换为executedPlan的附加规则中的非常重要的一条。该规则主要是确保通过一些链接Join和聚合Agg算子时,数据可以被高效正确的策略来分配。

物理计划中的每个算子都有两个重要的属性outputPartitioningoutputOrdering,它们携带有关数据分布、数据在给定时刻如何分区和排序的信息。除此之外,每个运算符还有另外两个属性requiredChildDistributionrequiredChildOrdering ,通过它们对其子节点的outputPartitioningoutputOrdering的值提出要求,在进行转换时需要被确保满足。当然,并非所有的操作都存在对子节点的要求。

下面让我们通过SortMergeJoin操作示例来了解下。因为SortMergeJoin一个对其子节点要求很高的算子,它要求数据必须按照连接键(join key)进行分区和排序,才能正确合并。

Spark.table("tableA") \ 
.join(spark.table("tableB"), "id") \ 
.write 
...

上面有个tableA Join tableB的例子,该查询的SparkPlan如下所示(因为Spark中的查询计划是一个倒树,所以其子节点是在父节点的上面):

sparkPlan中我们可以看出, SortMergeJoin的两个子节点(两个Project算子)没有outputPartitioningoutputOrdering的信息(它们是UnknownNone),这是一种一般情况,数据没有提前重新分区,表是没有装桶。

当Spark将ER 规则应用于sparkPlan时,它可以得出SortMergeJoin的要求没有被满足,这时它将向sparkPlan填充ExchangeSort运算符以满足要求。Exchange运算符将负责对数据重新分区以满足requiredChildDistribution要求,而Sort运算符将对数据进行排序来满足 requiredChildOrdering的要求。因此最终执行的计划将如下所示:

4.2 分桶 (Bucketing)

如果两个表都通过连接键Join key进行了分桶,情况会有所不同。分桶是一种以预洗牌Pre-shuffle和可能预排序Pre-sorted状态来存储数据的技术,其中有关分桶的信息存储在metastore中。

在这种情况下,FileScan操作将根据来自metastore中的信息设置outputPartitioning,如果每个存储桶只有一个文件,则 outputOrdering 也将被设置,并将全部传递到下游的Project算子中。如果两个表都通过连接键存储到相同数量的存储桶中,则将满足outputPartitioning的要求,那么ER 规则不会将任何Exchange添加到计划中。在分桶策略中,连接Join两侧的分区数量相同是至关重要的,如果不同,则仍必须插入Exchange算子来使得分区数相同,一般我们可以通过spark.sql.shuffle.partitions配置来设置分区数。

所以说,如果我们可以正确的存储数据,Join连接可以是无shuffle操作的。

需要注意的是以上的分桶策略,Spark需要从metastore提取分桶的信息来设置*FileScan算子中的outputPartitioning,所以需要依赖Hive表。当然你可以可以实现自定义的Rule和算子来改善Spark这个鸡肋的行为。

4.3 重新分区(Repartition)

在spark中我们可以使用repartition来改变数据在集群上的分布,该函数将数据列作为参数,当然也可以设置分区数。在实现上,它将一个RepartitionByExpression节点添加到逻辑计划中,然后使用strategies将其转换为SparkPlan中的Exchange ,并将outputPartitioning设置为HashPartitioning,用作为参数的列名称作为键值。

repartition函数的另一种用法是,可以仅使用一个参数来调用它,该参数是应创建的分区数 ( repartition(n)),这将随机分配数据。

现在让我们看一些实际的示例,其中通过某些特定字段使用repartition来调整分布会带来一些效果。

示例一:单侧无shuffle join

如果上述join连接中的一个表已分桶而另一个表未分桶,会发生什么情况。在这种情况下,由于outputPartitioning两侧不同(一侧由分桶定义,而另一侧则为Unknown),因此无法满足要求。那么这时,ER规则会将Exchange添加到Join连接的两个分支中,因此Join连接的每一侧都必须进行Shuffle。

Spark 会简单地忽略一侧已经预先shuffle,并会浪费这个机会来避免shuffle。这里我们可以简单地在Join连接的另一端使用repartition,来确保在ER规则检查之前添加Exchanges并设置outputPartitioning。

# tableA 未分桶
# tableB 按id分桶到 50 个桶
Spark.table("tableA") \ 
.repartition(50, "id") \
 .join(spark.table("tableB"), "id") \ 
.write \ 
...

调用repartition会将一个Exchange添加到计划的左侧分支,但右侧分支将保持无洗牌状态。因为现在将满足要求,并且ER 规则将不再添加任何Exchange。因此,最终计划中我们只会进行一次洗牌,而不是两次洗牌。

或者,我们可以更改 shuffle 分区的数量以匹配tableB中的存储桶数量,在这种情况下不需要重新分区,因为ER 规则*将使右侧分支不进行 shuffle,并且它将仅调整左分支(与重新分区的方式相同):

# 将连接右分支中的桶数与shuffle分区数进行匹配:
Spark.conf.set("spark.sql.shuffle.partitions", 50)

示例 二:先聚合后Join

repartition有性能提升的另一个示例与查询有关。

在查询中,我们通过两个键聚合一个表,然后将这两个键之一连接另一个表(在这种情况下,这些表都没有存储桶)。让我们来看个简单示例:

{“id”:1,“user_id”:100,“price”:50,“date”:“2020-06-01”} 
{“id”:2,“user_id”:100,“price”:200 ,“date”:“2020-06-02”} 
{“id”:3,“user_id”:101,“price”:120,“date”:“2020-06-01”}

如上所示,每个用户可以在数据集中拥有许多行,因为他/她可以进行多次交易,这些交易都被存储在table A中。另一方面,tableB将包含每个用户的信息(姓名、地址等)。tableB没有重复,每条记录属于不同的用户。在我们的查询中,我们要计算每个用户在每个日期里的交易数量,同时需要关联用户个人信息。

dfA = Spark.table("tableA") # 交易(未存储桶)
dfB = Spark.table("tableB") # 用户信息(未存储桶)
dfA \ 
.groupBy("user_id", "date") \ 
.agg(count("*")) \ 
.join(dfB, "user_id")

该查询的SparkPlan如下所示:

SparkPlan中,您可以看到有两个HashAggregate运算符,第一个(顶部)负责部分聚合,第二个负责最终合并,其次还有一个SortMergeJoin,它的要求与之前相同。

在这个例子中,有趣的部分是HashAggregates。第一个对其子节点没有任何要求,但是,第二个要求 outputPartitioning 按user_id和date或这些列的任何子集进行HashPartitioning。一般情况下,这些要求没有得到满足,那么ER 规则将添加Exchanges(和Sorts)节点。这将导致执行计划如下所示:

就如所看到的一样,我们最终得到了一个包含三个Exchange运算符的执行计划。因此在执行过程中将发生 3 次shuffle。

现在让我们看看如何使用repartition来改变这种情况:

dfA = Spark.table("tableA").repartition("user_id") 
dfB = Spark.table("tableB")
dfA \ 
.groupBy("user_id", "date") \ 
.agg(count("*")) \ 
.join(dfB, "user_id")

在进行repartition后,SparkPlan看起来有所改变,它将产生由从逻辑计划转换RepartitionByExpression节点的策略来生成的Exchange 。此Exchange将是第一个HashAggregate运算符的子节点,它将把outputPartitioning设置为HashPartitioning (user_id),该操作将传递到下游:

现在,左边Join分支中所有运算符的outputPartitioning要求都得到满足,因此ER 规则将不会添加额外的Exchange(它仍会添加Sort 以满足outputOrdering)。

这个例子中我们按两列进行分组,并且HashAggregate运算符的要求更加灵活,因此如果数据是按这两个字段中的任何一个进行分布的,都可以满足要求。最终执行的计划将在左侧分支中只有一个Exchange(右侧分支中只有一个),因此使用repartition,我们的shuffle次数减少了 1,如下图所示:

需要注意的是,确实使用repartition后,我们现在左分支中只有一个shuffle而不是两个,但重要的是要了解这些shuffle其实是不同一类型的!在最初的sparkPlan中,两个Exchange都在HashAggregate之后, HashAggregate负责部分聚合,因此在shuffle之前对数据进行了缩减(在每个节点本地聚合)。在新的执行计划中,Exchange出现在HashAggregate之前,因此是将完整的数据进行了shuffle。

那么那种更好的呢?是完整的一次shuffle还是两次缩减的shuffle?

这最终取决于数据的属性。如果每个user_iddate只有很少的数据,则意味着聚合不会减少太多数据,因此总 shuffle 与减少的数据相当,并且只有 1 次 shuffle 会更好。另一方面,如果每个user_iddate有庞大的数据记录,聚合将使数据变得更小,因此按照原始计划可能会更好,因为这两个小shuffle可能比一个大shuffle更快。这也可以用这两个字段user_iddate的所有不同组合的基数来表达。如果此基数与总行数相当,则意味着groupBy转换不会减少太多数据。

示例 三:两个聚合的并集

让我们基于前面的例子再考虑一个情景。现在,在我们的查询中,我们想要对两个不同的聚合进行并集,在第一个聚合中,我们将计算每个用户的行数,在第二个聚合中,我们将对价格列求和。

如下所示:

countDF = df.groupBy("user_id") \ 
.agg(count("*").alias("metricValue")) \ 
.withColumn("metricName", lit("count"))
sumDF = df.groupBy("user_id") \ 
.agg(sum("price").alias("metricValue")) \ 
.withColumn("metricName", lit("sum"))
countDF.union(sumDF)

这是该查询的最终执行计划:

这是类似Union联合查询的典型计划,Union中的每个 DataFrame 都有一个分支。我们可以看到有两次Shuffle,每个聚合一次。除此之外,根据执行计划,数据集将被扫描两次。

下面我们通过repartition函数来改变执行计划:

df = spark.read.parquet(...).repartition("user_id")
countDF = df.groupBy("user_id") \
.agg(count("price").alias("metricValue")) \
.withColumn("metricName", lit("count"))
sumDF = df.groupBy("user_id") \
.agg(sum("price").alias("metricValue")) \
.withColumn("metricName", lit("sum"))
countDF.union(sumDF)

repartition函数会将Exchange运算符移到HashAggregate之前,并使Exchange子分支相同,因此它将被另一个名为ReuseExchange的规则重用。在count函数中,我们将星号更改为price列在这里变得很重要,因为它将确保两个 DataFrame 中的投影相同(我们需要将price列*投影到左侧分支中,以使其与右侧分支相同)。然而,仅当price列中没有null值时,它才会产生与原始查询相同的结果。

与之前类似,我们在这里将shuffle数量减少了 1,但不同于之前我们现在再次得到了总shuffle,而不是原始查询中汇总shuffle的结果。这里的额外好处是,经过此优化后,由于重复使用了计算,数据集将仅扫描一次。

分布信息丢失

正如我们已经提到的,不仅以最佳方式分布数据很重要,而且让 Spark 知道数据是如何分布的也很重要。

例如,outputPartitioning的信息通过执行计划从子节点传播到其父节点。但是,即使我们实际数据分布没有改变,在经过某些算子时也会停止传播这些数据分布信息,导致Spark出现信息丢失。

其中BatchEvalPython就是这样一个运算符,该运算符表示 Python API 中的用户定义函数。因此,如果您对数据进行重新分区,然后调用 Python 的UDF,然后执行Join(或某种聚合),ER 规则将添加新的Exchange ,因为BatchEvalPython不会向下游传递outputPartitioning信息。我们可以简单地说,在调用Python UDF之后,Spark会忘记数据是如何分布的。

控制生成文件的数量

在将数据分区和/分桶到存储系统时要控制生成的文件数量。

df \
.write \
.partitionBy(key) \
.save(path)

如果数据在Spark作业的最后stage随机分布,它将会产生大量小文件。最后stage的每个任务可能包含所有键的值,因此它将在每个存储分区中创建一个文件,从而产生大量小文件。

我们在写入之前调用自定义repartition重新分区允许调节和设置所需的分布,从而控制生成的文件的数量。

总结

重新分区repartition功能允许我们改变Spark集群上数据的分布。这种分布变化将在执行过程中引起Shuffle(物理数据移动),这是一个相当昂贵的操作。

我们举了一些示例,通过repartition所带来的这种额外的Shuffle可以同时合并和删除一些其他Shuffle,从而使整体执行更加高效。

其次,我们还看到,在优化执行计划时,一定要区分两种类型的Shuffle,即完全Shuffle(移动所有数据)和简化Shuffle(在部分聚合后移动数据)。有时,虽然我们减少了Shuffle的数量,但任务的执行时间却变的更长了,这取决于不同情况下数据的属性。


如果觉得这篇文章对你有所帮助,
请点一下或者,是对我的肯定和支持~


请使用浏览器的分享功能分享到微信等