当处理大规模数据集时,Apache Spark是一个强大的工具,但有时查询速度可能会成为瓶颈。为了快速提升Apache Spark的查询速度,以下是五个技巧可以帮助你优化和加速查询过程:
技巧1: 过滤尽可能多的数据
一般来说,我们只想提取查询中所需的数据,但实际上SQL的执行却把所有数据扫描了一遍。
如果我们在读取数据时,只扫描SQL相关数据,那么这将有助于我们节省读取操作所需的成本和时间,并确保我们只对与相关的数据执行转换操作。
这就好像搬家一样——在你决定扔掉哪些东西之后再把需要的东西都搬到新地方,这样是最节省能源和时间。
对于Spark的过滤这里有2种方式:
第一个是 partition filters;
首先,我们应该根据基础表的分区方式进行过滤。通常,它是某种日期列。
你可以使用下面的SQL进行简单的检查,找出您的表正在使用哪个分区列。
DESCRIBE TABLE EXTENDED my_table
在查询时,确保我们尽可能只检索基于此分区的相关数据。
由于我们的基础表本质上将存储为多个单独的文件(基于分区列),因此仅提取我们需要的文件将大大减少扫描所有文件夹数和加载表所需的时间。
第二个是pushed filters;
它在本质上是将整个Spark 作业中的任何位置中的filters“推送”到靠近源表中的位置。
例如,如果您将多个订单表连接在一起,执行多次转换,并且仅在最后过滤掉客户 ID,那么 Spark 会尝试将此过滤器移动到您第一次加载订单表时,因此您不需要在转换中包含不必要的其他客户的数据。
然而,并非每个查询都是简单的,因此,在某些情况下,过滤器可能无法“推送”到源。(即,如果您要缓存中间数据集)
因此,仔细检查 Spark 查询计划以查看它是否包含在内始终是一个好主意。
如上面图片显示的,您可以看到,对于以下查询,我们的分区过滤器包含为date = 20230215,并且我们PushedFilter适用于country_id = 3。
SELECT order_id, sales_amount, country_id
FROM sales.daily_orders
WHERE country_id = 3
AND date = 20230215
除了上述之外,我们还应该只选择与我们相关的列。
所有这些都有助于降低数据大小,从而加快其他转换过程。
技巧2: 对维度表进行Broadcast Hash Joins
Spark在连接表时(Join),将使用多种类型的连接策略(SortMergeJoin、ShuffleHashJoin 等)。
其中最有效的连接是广播连接(Broadcast join)。此连接的工作原理是将较小的表广播到 Spark 应用程序的所有节点,这样较小的表将不需要在转换期间跨分区进行Shuffle。
通常,这种情况下较小的表指的是维度表,spark将此阈值设置在1.5GB以下。我们可以对增量表的物理空间进行简单的检查:
spark.sql("describe detail schema.delta_table_name").select("sizeInBytes").collect()
但是,在某些情况下,您的维度表可能会超过 1.5GB,或者事实表可能会低于 1.5GB。如果发生这种情况,我们可以增加这个限制
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 12345678)
或者我们可以在连接查询中包含广播连接提示
from pyspark.sql.functions import broadcast
fact_table.join(
broadcast(dimension_table),
dimension_table.id = fact_table.id
)
注意事项: 有大约 8GB 的硬限制(从 Spark 3.4 开始),并且它仍然应该能够适合驱动程序和执行程序内存,否则我们将开始出现 OOM 错误。因此,如果您的表预计将来会大幅增加,请避免这样做。
技巧3: 缓存中间数据集
Spark 的转换算子都是延迟转换的。也就是说,在我们执行真正的“操作”之前,不会完成任何实际计算,例如展示最终数据集或计数操作。这就是 Spark 如此强大的原因,RDD链接所有转换操作,然后在可能的情况下优化它们。
但这意味着当您将任何数据帧存储在变量中并作为通用中间数据帧共享时,它实际上并不物理上“存在”。
以下面的转换为例,其中sales_df是包含每日销售数据的事实表,employee_df是包含员工详细信息的维度表:
sales_employee_df = sales_df.join(employee_df, how= 'left' , on = 'employee_id' )
我们已连接两个表并将它们分配给sales_employee_df。例如,我们要执行 2 个单独的转换:
sales_by_employee = sales_employee_df.groupBy( "employee_id" ).sum( "sales_amount" )
sales_by_region = sales_employee_df.groupBy( "employee_country" ).sum( "sales_amount" )
然后,我们将触发Join连接(销售人员和员工 df 之间)两次!
对于 Spark,这是 2 个独立的 Spark 作业,因此将提交两次并产生双倍的成本和时间。
这可能不是一个复杂的转换,但假设我们的中间数据集是一个非常昂贵的操作(涉及连接多个表中的数百万行),那么差异就开始显现出来。
为了解决这个问题,我们可以利用缓存。这使我们能够将数据集保留在内存中,以便在需要时重用。然而,值得注意的是,我们只应该在整个管道中多次重用 Dataframe 时才缓存 Dataframe,并且中间 Dataframe 是长期且昂贵的转换的结果。我们还应该记住,当不再需要它时,最好再对其取消持久化。
要缓存 Dataframe,您可以调用.cache()或.persist()方法。因为,它们是惰性转换,因此我们需要调用像.count()这样的操作方法。
sales_employee_df.cache()
sales_employee_df.count()
技巧4:避免 UDF
在某些时候,您可能会在 Spark 管道中遇到甚至使用用户定义函数 (UDF) 。
虽然它是定义和应用自定义函数的便捷方法,但它会严重降低查询效率,尤其是在 Python 中。
原因是代码需要在Python解释器和JVM之间进行序列化和反序列化,而不是让Spark SQL查询直接在JVM本身上运行,这会带来一定的开销。
此外,当我们处理 UDF 时,Spark中的一些优化工作(例如谓词下推过滤器)无法Work。
因此,我们应该尽量避免将它们纳入我们的管道中。您尝试做的事情很可能可以使用本机 Spark 转换或高阶 SQL 函数来完成。如果绝对必须使用它们,请考虑尝试 Scala UDF 或 Pandas UDF。
技巧5 :检查 Spark 任务中是否有倾斜和溢出
对于本技巧,我们必须深入研究 Spark history Server的页面。
对于Spark history Server这是一个稍微复杂一点的问题,可能需要另一篇文章来讨论,但我们可以尝试一些快速查明并修复问题的根源。
首先,转到 Spark history Server,如果是正在运行的任务可以直接查看Spark UI, 对于已经完成运行的就需要到历史服务器页面查看。
如果您从集群的 Spark UI 轻松访问它。在“Job”页面下找到您的 Spark 作业,然后查看任务指标。
如果您在此页面上看到Spill(Disk/Memory),则表明正在发生溢出。
这可能是由数据集中的重分区(或Join连接)引起的。当Executor中处理数据时,如果Executor无法在内存/RAM 中保存如此多的数据,就会发生这种情况,而这导致它们必须先将其中一些数据保存到磁盘中。因此就有了术语“溢出”。
此外,Summary指标还将向您显示记录的分布和任务的持续时间,您可以在其中检查偏差。

在上面的示例中,我们可以看到,从75 %到100%的任务,需要花费 500 秒以上才能完成,并且它们处理的Input Size/Records比其他从0%~75%任务多几倍。
这种现象被描述为数据倾斜,它会导致性能瓶颈,因为大多数任务需要相对较短的时间才能完成,而少数任务会被卡住并需要更长的时间才能完成,由于每个任务的记录分布不平衡。
倾斜和溢出可能是相互关联的,如果解决了倾斜数据问题,由于分布更加均匀,溢出问题很可能得到解决,这使得只有一两个核心共享大部分数据的可能性较小。加载。
为了解决这些问题,我们可以尝试以下方法
检查 AQE 是否已启用,如果没有,请通过 启用它
spark.conf.set("spark.sql.adaptive.enabled", true),这可以解决常见的根本问题;检查维度表是否使用广播哈希连接Join,如上所述;
在执行Join连接之前检查您的数据框。例如,连接键中可能存在大量 NULL,然后我们可以先将它们分开,然后加入数据集的其余部分,最后进行并集将它们重新组合在一起。
调整随机分区以确保每个分区处理大约 200MB 的数据
spark.conf.set("spark.sql.shuffle.partitions", 12345)加盐
具体可以参考文章:
总结
在Spark任务运行中,上述的小Tips可以帮你快速的提效,下面我们简单总结下:
首先,建议在读取数据时仅扫描与查询相关的数据,并将过滤器尽可能推送到靠近源表的位置,以减少不必要的数据读取。
其次,对于Join连接操作,推荐使用广播哈希连接,以避免跨分区进行Shuffle。
再次,可以通过使用缓存将数据集保留在内存中,在需要时重用,从而提高查询效率。
此外,尽量避免使用用户定义函数(UDF),而是使用本机Spark转换或高阶SQL函数来替代,以减少代码的序列化和反序列化开销。
最后,通过观察Spark任务指标,可以检查是否存在数据倾斜和溢出问题,并采取相应的解决措施,例如启用自适应执行或调整分区数。
这些技巧的应用可以显著提高Spark查询的性能和效率。