大家好,我是Tim。
在之前我们已经深入的解析过Spark SQL中的AQE、DPP等实现源码,但是并没有对Spark SQL整个的执行过程有个全面的概览性的介绍。
本篇文章将会对Catalyst 的工作原理有一个全局的了解,我将介绍 Spark SQL 中的 QueryExecution 类,以及在Spark SQL执行过程中所经历的每个阶段。
在Spark SQL的执行过程中,QueryExecution类贯穿始终,它是Spark执行关系查询的主要workflow。
一条SQL执行过程

如上图所示,一条SQL在Spark SQL引擎的解析过程中被分为逻辑计划阶段和物理计划阶段。
在逻辑计划阶段,当Spark SQL引擎接收到一条SQL查询时,它首先将被解析为一个Unresolved Logical Plan。
此时的SQL解析树并不包含具体的表和列的信息。接下来,解析树会经过Catalog的处理,Catalog会根据元数据信息将解析树与具体的表和列进行关联。
其次,解析树被传递给sessionState.analyzer组件,它的任务是将Unresolved Logical Plan转换为Logical Plan。
在这时,Spark SQL引擎会应用一系列的优化规则和转换操作,即RBO(Rule-Based Optimizer)如谓词下推和投影消除,以提高查询性能和效率。
此外,在经过sessionState.optimizer的处理后,Logical Plan会被转换为Optimized Logical Plan,这时就完成了逻辑计划阶段的工作。
在物理计划阶段,首先将经过优化后的Logical Plan将传递给queryPlanner.plan,这个阶段会生成多个可能的Physical Plans。
如果启用了CBO,将通过CBO来进一步评估和选择最优的Physical Plan执行计划,否则将选择第一个Physical Plan作为Selected Physical Plan。
在完成Selected Physical Plan后。如果启用了AQE,则SparkSQL引擎会应用自适应查询执行AQE技术,根据实际运行时的统计信息进行动态调整和优化。
最终,通过Codegen生成的RDD用于实际的数据处理和计算操作。这些RDD将分布在集群的不同节点上,并且可以并行处理大规模数据集,以达到高性能和可伸缩性的目标。
下面我们以一条关联销售表和商品表从中查询商品价格小于10的销售数据SQL为例,详细的介绍下这其中与QueryExecution相关的过程。
df = spark.sql('''
Select s_date, s_quantity, i_price
From sales
Join items On s_item_id = i_item_id
Where i_price < 10
''')
SQL Parsing
在Spark Catalyst 查询计划中,其内部数据结构是树结构。
在经过每一个阶段的过程中,都会通过从上到下或从下到上的遍历每一个节点,然后应用当前的规则进行转换。
如上SQL所示,输入的文本格式的SQL语句首先会被解析为树类型的数据结构。
当调用spark session的sql方法时,SQL解析器首先将sql语句解析为ANTLR ParseTree,然后将ANTLR ParseTree转换为未解析的逻辑计划(Unresolved Logical Plan)。
然后将未解析的逻辑计划(Unresolved Logical Plan)传递到 Dataset 的 ofRows 方法中以创建 QueryExecution 实例。
def sql(sqlText: String): DataFrame = withActive {
val tracker = new QueryPlanningTracker
val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
sessionState.sqlParser.parsePlan(sqlText)
}
Dataset.ofRows(self, plan, tracker)
}
当我们点进spark.sql的源代码中就可以看到该方法。
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, tracker: QueryPlanningTracker)
: DataFrame = sparkSession.withActive {
val qe = new QueryExecution(sparkSession, logicalPlan, tracker)
qe.assertAnalyzed()
new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
}
从上面的代码可以看出,QueryExecution生命周期的开始就是在Dataset.ofRows方法中创建的。
以下是从 sql 语句中解析出的未解析的逻辑计划。对于我们的 sql 示例来说,我们可以看到,“sales”表和“items”表都是未解析,列名称未经验证,并且列上没有类型信息。

Analysis
Spark SQL Catalyst提供了一个逻辑查询计划分析器,它从SessionCatalog中检索表和属性信息,并将UnresolvedAttributes和UnresolvedRelations转换为完全类型化的对象。
SessionCatalog是对底层元数据存储(如Hive Metastore)的代理,你可以简单地把元数据存储看作是可以查找表和列的完整类型信息的地方。
根据Databricks发表的论文《Spark SQL: Relational Data Processing in Spark》,在分析阶段,逻辑查询计划分析器会执行以下操作:
通过名称在SessionCatalog中查找关系
映射命名属性,例如列名
确定具有相同值的属性,并给予唯一的ID
通过表达式传播和强制类型
回到我们的例子,经过Analysis分析后,我们得到了以下经过分析的逻辑查询计划LogicalPlan。
如你所见,逻辑查询计划现在知道了关系的完整名称和文件格式,列的数据类型被识别出来,并为每个列分配了唯一的ID。

执行的代码被封装在QueryExecution类中analyzed。
lazy val analyzed: LogicalPlan = executePhase(QueryPlanningTracker.ANALYSIS) {
// We can't clone `logical` here, which will reset the `_analyzed` flag.
sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
}
Logical Optimisation
在逻辑计划被分析后,逻辑计划优化器会对逻辑计划应用标准的基于规则的优化(RBO)。
Catalyst提供了大量的预构建的逻辑计划优化规则,每个规则都可以将结构化查询的一部分转换为一个优化的逻辑计划,这些规则被分成规则批次进行执行。
这些规则都会继承Rule类,在进行转换时会调用Rule的Apply方法一次遍历节点进行应用。

我们以PushDownPredicate谓词下推规则举例,正好它在我们举的SQL例子中并应用。
谓词下推将filter从Join操作之后移动到Join操作之前和数据加载之后,这种优化减少要Join的数据集的大小,并确保谓词在物理规划阶段被推送到数据源。

从上面的LogicalPlan我们可以看出,Filter操作被执行在Join里,即先于Join操作执行。
上述规则的执行是依赖于QueryExecution类中optimizedPlan,代码如下所示。
lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) {
// clone the plan to avoid sharing the plan instance between different stages like analyzing,
// optimizing and planning.
val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
// We do not want optimized plans to be re-analyzed as literals that have been constant folded
// and such can cause issues during analysis. While `clone` should maintain the `analyzed` state
// of the LogicalPlan, we set the plan as analyzed here as well out of paranoia.
plan.setAnalyzed()
plan
}
Physical Planning
LogicalPlan逻辑计划是与平台无关的,它是不能被Spark引擎解释和执行的。
LogicalPlan逻辑计划需要通过SparkPlanner转换为SparkPlan。
在QueryExecution中提供了一个createSparkPlan方法,它以优化后的逻辑计划作为输入,并调用SparkPlanner的plan方法,该方法应用节点匹配策略,返回一组候选物理计划Physical Plans。
lazy val sparkPlan: SparkPlan = {
// We need to materialize the optimizedPlan here because sparkPlan is also tracked under
// the planning phase
assertOptimized()
executePhase(QueryPlanningTracker.PLANNING) {
// Clone the logical plan here, in case the planner rules change the states of the logical
// plan.
QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())
}
}
如果我们开启CBO模式,即基于成本的模型分析,Spark将通过CBO从这一组候选物理计划Physical Plans选择最优的物理计划Physical Plan用于下一步的执行。
即使在Spark版本3.0.0,基于成本的模型(CBO)仍然是默认关闭的,临时解决方案是使用SparkPlanner返回的第一个计划,即使这个物理计划Physical Plan并不是最优的。
def createSparkPlan(
sparkSession: SparkSession,
planner: SparkPlanner,
plan: LogicalPlan): SparkPlan = {
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
planner.plan(ReturnAnswer(plan)).next()
}
看源码中的注释,你大概就能明白其中的意思,目前任然是选择第一个而非最优。
回到我们的例子中,优化后的逻辑计划中的逻辑操作符被转换为它们的物理对应操作符。
关系被DataSource策略转换为FileScan节点,其中过滤谓词被推送到数据源,只读取查询所需的数据。
Join Inner逻辑操作符通过Join策略转换为SortMergeJoin(我们需要鼓励Join策略选择更高效的连接算法,比如广播连接)。

Execution Preparation
SparkPlanner将逻辑计划转换为物理计划后,需要执行一系列准备规则来准备物理计划的执行。
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = {
// We need to materialize the optimizedPlan here, before tracking the planning phase, to ensure
// that the optimization time is not counted as part of the planning phase.
assertOptimized()
executePhase(QueryPlanningTracker.PLANNING) {
// clone the plan to avoid sharing the plan instance between different stages like analyzing,
// optimizing and planning.
QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
}
}
如上源码所示,在执行前会调用prepareForExecution做最后的准备。例如这里可能会插入shuffle操作以备SparkPlan执行时满足要求。
private[execution] def preparations(
sparkSession: SparkSession,
adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = {
// `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
adaptiveExecutionRule.toSeq ++
Seq(
CoalesceBucketsInJoin(sparkSession.sessionState.conf),
PlanDynamicPruningFilters(sparkSession),
PlanSubqueries(sparkSession),
RemoveRedundantProjects(sparkSession.sessionState.conf),
EnsureRequirements(sparkSession.sessionState.conf),
// `RemoveRedundantSorts` needs to be added before `EnsureRequirements` to guarantee the same
// number of partitions when instantiating PartitioningCollection.
RemoveRedundantSorts(sparkSession.sessionState.conf),
DisableUnnecessaryBucketedScan(sparkSession.sessionState.conf),
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
sparkSession.sessionState.columnarRules),
CollapseCodegenStages(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf)
)
}
我们以EnsureRequirements 规则为例,它要求确保满足物理操作的所有要求(如果没有,则执行插入操作满足要求所需要求)。
在我们的示例中,SortMergeJoin 运算符计划用于查询中的联接操作,它在执行之前需要reshuffle和sort,EnsureRequirements 规则将添加这些必需的操作。

除了确保需求之外,EnsureRequirements规则还将确保子查询的规划、数据分区和排序的正确使用、交换和子查询的重用。此外,WholeStageCodeGen 也是在此阶段插入的。
总结
总的来说,一段SQL查询与QueryExecution相关的过程中需要经历以下几个阶段:
SQL Parsing:将SQL语句解析为树状结构的数据,然后转换为未解析的逻辑计划,并创建 QueryExecution 实例。
Analysis:通过SessionCatalog检索表和属性信息,将未解析的属性和关系转换为完全类型化的对象,并生成经过分析的逻辑查询计划。
Logical Optimization:应用基于规则的优化(RBO),如谓词下推规则,将逻辑计划转换为优化后的逻辑计划。
Physical Planning:将逻辑计划转换为物理计划,选择最优的物理计划用于执行(CBO)。
Execution Preparation:执行一系列准备规则来准备物理计划的执行,确保满足物理操作的所有要求,并插入必需的操作。如果启用了 AQE(Adaptive Query Execution),Spark SQL 引擎会根据实际运行时的统计信息进行动态调整和优化。
最终,通过 Codegen 生成的 RDD 用于实际的数据处理和计算操作,这些 RDD 将分布在集群的不同节点上,并且可以并行处理大规模数据集,以达到高性能和可伸缩性的目标。