[SPARK][SQL] 聊聊Spark 3.3 中的Runtime Filter Joins

今天来聊聊Spark 3.3 中的Runtime Filter Joins的实现。

行级运行时过滤器(row-level runtime filters)指的是,Spark 可以根据需要在查询计划中注入和下推 Filter,以便在早期过滤数据,减少 shuffle 和后期计算的中间数据大小。这可以用于补充动态分区修剪(dynamic partition pruning,DPP)和动态文件修剪(dynamic file pruning,DFP),以适应动态文件跳过(dynamic file skipping)不够适用或不够彻底的情况。

在Spark中,Join是一个非常耗费资源耗费时间的操作,一般Join中都会涉及到Shuffle的操作,需要大量的IO操作。 如果我们能够尽可能的在靠近源头上减少参与计算的数据,一方面可以提高查询性能,另一方面也可以减少资源的消耗(网络/IO/CPU等)。Runtime Filter Joins的思路很简单就是利用小表的Join keys基于大表Join keys构造过滤器,来减少大表的数据读取

在电商的星型数仓的数据关联场景中,可以充分利用过滤之后的维度表,大幅削减事实表的数据扫描量,从整体上提升Join计算的执行性能。

运行时行级过滤

Runtime Filter是在数据库中广泛使用的一种优化技术,其基本原理是通过在join的probe端提前过滤掉那些不会命中join的输入数据来大幅减少join中的数据传输和计算,从而减少整体的执行时间。

下面我们用一个例子来进行说明:

1SELECT * FROM order JOIN items ON order.item_id = items.id WHERE items.price > 7999


例如上图所示是电商场景中场景的Join场景。在执行上述的Join SQL的时候,不仅需要把全量的order数据传输到join算子里去,还需要依次对Order表中的数据进行Hash和比较运算。这里如果小表的选择度比较高,那么Order大表中的大部分数据是几乎用不上的,如果提前进行过滤掉,可以减少数据的传输和计算的开销。

一句话来表明Spark中的Bloom Filter Joins,即通过使用 Bloom 过滤器和Join另一侧的Join Keys的值来生成 IN 谓词,然后对Join的一侧进行预过滤来提高某些Join的性能。

那么Spark中的运行时的行级过滤是如何实现的呢?

在Spark中使用spark.sql.optimizer.runtime.bloomFilter.enabled和spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled属性启用的行级运行时过滤器。它引入了两种不同的优化,分别是基于聚合的连接的聚合布隆过滤器和将join转换为带有子查询的Semi-Join Filter。

那为什么有了BloomFilter还需要另一个策略转换为Semi-Join?

这里的原因是考虑到BloomFilter有假阳性的问题,另外如果数据量比较大其结果可能会出现判断错误的情况。所以说如果当构造表数据量较大,但其join keys足够小(或经过过滤后)可以进行广播,那么使用Semi-Join可能会带来更多的收益。

runtimeFilter 源码分析

1. 尝试注入RuntimeFilter,并校验是否满足应用条件。

RuntimeFilter的调用来源于Spark optimizer。在Spark的优化器中注入了runtimeFilter的Rule,在对SQL进行优化时会调用到这个Rule。下面我们来看看InjectRuntimeFilter是如何对SQL进行优化的,最终优化的Plan会长成什么样子呢?

1Batch("InjectRuntimeFilter", FixedPoint(1),
2  InjectRuntimeFilter,
3  RewritePredicateSubquery)

在调用InjectRuntimeFilter规则时会运行其apply方法,在Apply方法中会判断spark.sql.optimizer.runtime.bloomFilter.enabled和spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled是否被打开,如果都没有打开则不会进行runtimeFilter的优化。否则会调用tryInjectRuntimeFilter方法进行尝试进行运行时过滤的优化。

 1private def tryInjectRuntimeFilter(plan: LogicalPlan): LogicalPlan = {
2  var filterCounter = 0
3  // [1] 这里为了避免driver端的oom, 约定一条sql最多只能做10个bloom过滤器
4  val numFilterThreshold = conf.getConf(SQLConf.RUNTIME_FILTER_NUMBER_THRESHOLD)
5  plan transformUp {
6    // 可以看到runtimeFilter只处理相等key的情况
7    case join @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, left, right, hint) =>
8      var newLeft = left
9      var newRight = right
10      (leftKeys, rightKeys).zipped.foreach((l, r) => {
11        // [2] 校验是否满足以下情况
12        // Check if:
13        // 1. There is already a DPP filter on the key
14        // 2. There is already a runtime filter (Bloom filter or IN subquery) on the key
15        // 3. The keys are simple cheap expressions
16        if (filterCounter < numFilterThreshold &&
17          !hasDynamicPruningSubquery(left, right, l, r) &&
18          !hasRuntimeFilter(newLeft, newRight, l, r) &&
19          isSimpleExpression(l) && isSimpleExpression(r)) {
20          val oldLeft = newLeft
21          val oldRight = newRight
22          // [3] 进行应用injectFilter,如果左侧不行会尝试右侧
23          if (canPruneLeft(joinType) && filteringHasBenefit(left, right, l, hint)) {
24            newLeft = injectFilter(l, newLeft, r, right)
25          }
26          // Did we actually inject on the left? If not, try on the right
27          if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType) &&
28            filteringHasBenefit(right, left, r, hint)) {
29            newRight = injectFilter(r, newRight, l, left)
30          }
31          if (!newLeft.fastEquals(oldLeft) || !newRight.fastEquals(oldRight)) {
32            filterCounter = filterCounter + 1
33          }
34        }
35      })
36      join.withNewChildren(Seq(newLeft, newRight))
37  }
38}

从上面源码可以看出,即使开启了开关,也需要进行校验是否满足了以下的情况:

[1] 已注入BloomFilter的数量低于spark.sql.optimizer.runtimeFilter.number.threshold(默认为 10), 这里限制的原因是避免 OOM 问题。

[2] 到目前为止优化的逻辑计划中不存DDP。

[3] BloomFilter和Semi-Join过滤器都不存在已有的RuntimeFilter在对应的Join Keys上。

[4] 左侧和右侧join keys是简单的表达式,此模式匹配的计算操作并不昂贵, 即不包含以下的操作。

1private def isSimpleExpression(e: Expression): Boolean = {
2  !e.containsAnyPattern(PYTHON_UDF, SCALA_UDF, INVOKE, JSON_TO_STRUCT, LIKE_FAMLIY,
3    REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE)
4}

当以上的条件都满足的话,则会进行应用injectFilter的操作,从上面的源码可以看出,会左侧和右侧分别进行尝试应用。下面我们以应用左表进行分析。在应用左侧的filter前,会先根据join keys进行判断是否可以canPruneLeft, 以及通过filteringHasBenefit判断是否应用Filter可以得到收益。

1def canPruneLeft(joinType: JoinType): Boolean = joinType match {
2  case Inner | LeftSemi | RightOuter => true
3  case _ => false
4}

判断left表是否可以进行剪枝,join类型需要为Inner或LeftSemi或RightOuter类型。

 1private def filteringHasBenefit(
2    filterApplicationSide: LogicalPlan,
3    filterCreationSide: LogicalPlan,
4    filterApplicationSideExp: Expression,
5    hint: JoinHint)
: Boolean = {
6  findExpressionAndTrackLineageDown(filterApplicationSideExp,
7    filterApplicationSide).isDefined && isSelectiveFilterOverScan(filterCreationSide) &&
8    (isProbablyShuffleJoin(filterApplicationSide, filterCreationSide, hint) ||
9      probablyHasShuffle(filterApplicationSide)) &&
10    satisfyByteSizeRequirement(filterApplicationSide)
11}

Plan转换是否对查询有益,主要是通过以下条件进行判断的:

[1] 如果过滤器应用端的输入来自单个叶子节点, 过滤器应用程序端是将使用运行时过滤器的连接端。

[2] 过滤器创建端的过滤表达式有一个选择性谓词,例如“=”、“<=”、“IN”、… 完整列表在PredicateHelper#isLikelySelective方法中可用。

[3] 当前的连接树是一个Shuffle连接或Broadcast连接,其下有一个Shuffle连接。

[4] 过滤器应用端的大小大于spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold(默认为 10GB)。

当左侧可以进行prune, 同时应用runtimeFilter预计会产生收益,则会调用injectFilter方法。

 1private def injectFilter(
2    filterApplicationSideExp: Expression,
3    filterApplicationSidePlan: LogicalPlan,
4    filterCreationSideExp: Expression,
5    filterCreationSidePlan: LogicalPlan)
: LogicalPlan = {
6  require(conf.runtimeFilterBloomFilterEnabled || conf.runtimeFilterSemiJoinReductionEnabled)
7  if (conf.runtimeFilterBloomFilterEnabled) {
8    injectBloomFilter(
9      filterApplicationSideExp,
10      filterApplicationSidePlan,
11      filterCreationSideExp,
12      filterCreationSidePlan
13    )
14  } else {
15    injectInSubqueryFilter(
16      filterApplicationSideExp,
17      filterApplicationSidePlan,
18      filterCreationSideExp,
19      filterCreationSidePlan
20    )
21  }
22}

从上面可以看出,如果开启BloomFilter则应用injectBloomFilter,如果开启SemiJoinReduction则应用injectInSubqueryFilter。下面我们依次来看下:

2. 开启BloomFilter,将plan转换为带有聚合布隆器的Join。

我们来总的看下运行时BloomFilter的实现。从上图可以看出,在开启BloomFilter后,会从Dataset1中根据过滤条件筛选出join所需要的join keys, 然后通过聚合将其更新到bloomFilter中,然后在数据扫描加载Dataset2时,会根据join keys去bloom过滤器中查询是否满足条件,只将满足条件的数据进行查询出,这样可以减少扫描的数据量。下面我们来看下injectBloomFilter方法:

 1private def injectBloomFilter(
2    filterApplicationSideExp: Expression,
3    filterApplicationSidePlan: LogicalPlan,
4    filterCreationSideExp: Expression,
5    filterCreationSidePlan: LogicalPlan): LogicalPlan = {
6  // [1] 如果creation side(小表侧)的sizeInBytes大于阈值(默认是10M)则不进行应用。
7  // Skip if the filter creation side is too big
8  if (filterCreationSidePlan.stats.sizeInBytes > conf.runtimeFilterCreationSideThreshold) {
9    return filterApplicationSidePlan
10  }
11  // [2] 将creation side(小表侧)的join keys进行hash后封装bloom过滤器的聚合函数,并封装为agg表达式,过滤器的大小设置为表的行数。
12  val rowCount = filterCreationSidePlan.stats.rowCount
13  val bloomFilterAgg =
14    if (rowCount.isDefined && rowCount.get.longValue > 0L) {
15      new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)),
16        Literal(rowCount.get.longValue))
17    } else {
18      new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)))
19    }
20  val aggExp = AggregateExpression(bloomFilterAgg, Complete, isDistinct = false, None)
21  val alias = Alias(aggExp, "bloomFilter")()
22  // [3] 直接将creation side(小表侧)的逻辑计划应用bloomFilter,同时进行列剪裁和常量合并
23  val aggregate =
24    ConstantFolding(ColumnPruning(Aggregate(Nil, Seq(alias), filterCreationSidePlan)))
25  // [4] 将ApplicationSide(大表侧)的join keys进行hash, 并去BloomFilter查询匹配
26  val bloomFilterSubquery = ScalarSubquery(aggregate, Nil)
27  val filter = BloomFilterMightContain(bloomFilterSubquery,
28    new XxHash64(Seq(filterApplicationSideExp)))
29  // [5] 最终是封装为Filter算子,插入到应用侧(即大表侧)
30  Filter(filter, filterApplicationSidePlan)
31}

从上面的代码可以看出,主要有以下几个步骤:

[1] 如果creation side(小表侧)的sizeInBytes大于阈值(默认是10M),则不进行应用。

[2] 将creation side(小表侧)的join keys进行hash后封装bloom过滤器的聚合函数,并封装为agg表达式,过滤器的大小设置为表的行数。

[3] 直接将creation side(小表侧)的逻辑计划应用bloomFilter,同时进行列剪裁和常量合并

[4] 将ApplicationSide(大表侧)的join keys进行hash, 并去BloomFilter查询匹配

[5] 最终是封装为Filter算子,插入到应用侧(即大表侧)

布隆过滤器的调用BloomFilterMightContain的方法中,主要根据key的hash值其bloom过滤器中进行查询。

 1public boolean mightContainLong(long item{
2  int h1 = Murmur3_x86_32.hashLong(item, 0);
3  int h2 = Murmur3_x86_32.hashLong(item, h1);
4
5  long bitSize = bits.bitSize();
6  for (int i = 1; i <= numHashFunctions; i++) {
7    int combinedHash = h1 + (i * h2);
8    // Flip all the bits if it's negative (guaranteed positive number)
9    if (combinedHash < 0) {
10      combinedHash = ~combinedHash;
11    }
12    if (!bits.get(combinedHash % bitSize)) {
13      return false;
14    }
15  }
16  return true;
17}

另外需要关注的是,这里的hash函数主要是采用Guava包中Murmur3Hash,另外这里的Bloomfilter是采用之前就给DataFrame实现的方法,其也是参照Guava中进行实现的,限于篇幅就不展开了。其实现主要在BitArray类中,在其内部采用long[] data来表示一个大的bitmap。

3. 开启SemiJoinReductionEnabled,则会将Plan转换为带有子查询的Semi-Join。

我们先来总的看下SemiJoinFilter的实现。其实这里和DDP非常类似,就是当过滤后的Join keys可以满足广播的情况,将其广播,然后封装为Filter算子,应用到Dataset2的扫描上。下面我们来看下injectInSubqueryFilter是如何实现的?

 1private def injectInSubqueryFilter(
2    filterApplicationSideExp: Expression,
3    filterApplicationSidePlan: LogicalPlan,
4    filterCreationSideExp: Expression,
5    filterCreationSidePlan: LogicalPlan): LogicalPlan = {
6  require(filterApplicationSideExp.dataType == filterCreationSideExp.dataType)
7  // [1] 将CreationSide的join keys进行hash, 然后进行聚合
8  val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideExp)
9  val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)()
10  val aggregate = Aggregate(Seq(alias), Seq(alias), filterCreationSidePlan)
11  // [2] 如果不满足Broadcast,直接返回,默认10M
12  if (!canBroadcastBySize(aggregate, conf)) {
13    // Skip the InSubquery filter if the size of `aggregate` is beyond broadcast join threshold,
14    // i.e., the semi-join will be a shuffled join, which is not worthwhile.
15    return filterApplicationSidePlan
16  }
17  // [3] 将aggregate和ApplicationSide的keys封装为InSubquery,最后封装为Filter条件
18  val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideExp)),
19    ListQuery(aggregate, childOutputs = aggregate.output))
20  Filter(filter, filterApplicationSidePlan)
21}

从上面的代码可以看出,主要有以下几个步骤:

[1] 将CreationSide的join keys进行hash, 然后进行聚合

[2] 如果不满足Broadcast,直接返回,默认10M

[3] 将aggregate和ApplicationSide的keys封装为InSubquery,最后封装为Filter条件

这里的实现很简单就不在过多赘述了。

总结下Spark是如何实现runtimeFilter的,以及有何优缺点:

  1. spark通过InjectRuntimeFilter规则的注入实现两种运行时的filter,分别是bloomFilter和semi-join Filter,这里考虑到的收益和花费的均衡。

  2. 需要注意的是bloomFilter需要小表满足其sizeInBytes小于阈值(默认10M), semi-join Filter需要满足小表join keys聚合后可以广播。这里主要是BloomFilter有假阳性的问题,当小表大小大于阈值后,比较大数据量其结果可能会出现错误的情况。

  3. 从上面的分享可以看出,Spark的runtimeFilter, 在Plan的转换中会存在冗余计算的问题,小表存在两次扫描和过滤的问题。

请使用浏览器的分享功能分享到微信等