谓词下推是指将过滤条件(谓词)应用于查询的尽可能早的阶段,以减少需要处理的数据量。
具体而言,当Spark执行查询时,它会尝试将过滤条件应用于数据源本身,而不是将整个数据集加载到内存中后再进行筛选。这样可以避免对不符合条件的数据进行处理,节省了计算资源和时间。
投影下推是指将查询中不需要的列从数据源中过滤掉,只加载查询需要的列。通过减少所需的列数量,可以减少磁盘IO和网络传输的数据量,提高查询性能。
Spark 3引入了一种称为列式存储(columnar storage)的技术,使得投影下推变得更加有效。
Spark 2.x 和 Spark 3.0 嵌套过滤的差异
使用Spark 2.x ,可以读取最多 2 级嵌套结构且扩展名为.json和.parquet的文件。
1filter(col(‘library.books’).isNotNull())
借助Spark 3,现在可以在 2 级以上嵌套结构中读取具有parquet和.snappy parquet扩展名的文件,而无需进行模式扁平化操作。
1filter(col(‘library.books.title’).isNotNull())
用于下推过滤的 Spark 会话配置
创建 Spark 会话时,应启用以下配置才能使用 Spark 3 的下推功能。
1"spark.sql.parquet.filterPushdown", "true"
2"spark.hadoop.parquet.filter.stats.enabled", "true"
3"spark.sql.optimizer.nestedSchemaPruning.enabled","true"
4"spark.sql.optimizer.dynamicPartitionPruning.enabled", "true"
Spark 中有两种下推过滤技术:谓词下推和投影下推,具有以下不同的功能:
谓词下推
谓词下推指向影响返回行数的where或filter子句。它基本上涉及将过滤哪些行 ,而不是哪些列。因此,当在“library.books”这样的嵌套列上应用过滤器以仅返回值不为空的记录时,谓词下推函数将使 parquet 读取不包含指定列的空值的块。
带有分区修剪的谓词下推
分区消除技术可以优化从相应文件系统读取文件夹时的性能,以便可以读取指定分区中所需的文件。它将解决将数据过滤转移到尽可能靠近源的位置,以防止将不必要的数据保留到内存中,从而减少磁盘 I/O。
下面,可以观察到下推操作的分区过滤器,即'library.books.title' = 'THE HOST'过滤器被下推到parquet文件扫描中。此操作可以最小化文件并扫描数据。
1data.filter(col('library.books.title') == 'THE HOST').explain()
对于更详细的输出,包括*解析的逻辑计划、分析的逻辑计划、优化的逻辑计划、物理计划,可以在explain()函数中添加“扩展”参数,如下所示:
1data.filter(col('library.books.title') == 'THE HOST').explain('extended')
Parquet 格式的文件为每列保留一些不同的统计指标,包括其值的最小值和最大值。
谓词下推有助于跳过不相关的数据并处理所需的数据。
投影下推
投影下推代表使用select子句选择的列,它会影响返回的列数。它将数据存储在columns中,因此当您的投影将查询限制为指定列时,将返回具体的那些列。
1data.select('library.books.title','library.books.author').explain()
这意味着对“library.books.title”、“library.books.author”列的扫描,
意味着扫描将在将数据发送回 Spark 引擎之前在文件系统/数据库中进行。
结论
对于投影和谓词下推,有一些关键点需要强调。
下推过滤适用于根据parquet格式文件的分区列计算。为了能够从中获得最大的好处,分区列应携带较小大小的值和足够的匹配数据,以将正确的文件分散在目录中。防止过多的小文件因并行性过高而导致扫描效率降低。此外,防止接受太少的大文件可能会损害并行性。
投影下推功能通过消除表扫描过程中不必要的字段,最大限度地减少文件系统/数据库和 Spark 引擎之间的数据传输。当数据集包含太多列时,它有很大作用。
谓词下推通过在过滤数据时减少文件系统/数据库和 Spark 引擎之间传递的数据量来提高性能。
投影下推是基于列的过滤,谓词下推是基于行的过滤。