向量化Parquet文件读取器是自Spark 2.0就添加的一个功能。
与逐行读取和解码不同,向量化读取器以列式格式批处理多行,并按列分批处理。
Databricks的性能测试显示,向量化Parquet读取器在列解码方面的速度比非向量化快了9倍。
本文就来扒一扒源码,看看Spark是如何支持向量化Parquet文件读取的。
ColumnBatch & ColumnVector 数据结构
Spark对Parquet文件进行列式读取,每次读取会返回一批数据,这批数据就被定义在ColumnBatch数据结构中。
ColumnBatch数据结构中定义了ColumnVectors数组,每个ColumnVector都将保存了批量读取的结果数据集中一个列,如下图所示。

存储在ColumnBatch中的数据集可以以基于“列”的视图的形式被下游消费,即逐列地进行访问。
然而,ColumnBatch也提供了一个rowIterator公共方法,它将以“行”的视图返回批处理数据集,其中每一行都封装在一个Spark SQL 的InternalRow对象实例中。
ColumnVector实例存储了批处理中某一列的字段值列表。可以通过rowId访问ColumnVector中的元素,rowId是该列中数值的基于批处理的本地0-based索引,即只能拿到这个“列的行值”。

ColumnVector可以用于存储任何相同类型的数据。
在Spark SQL查询过程中,特别是在返回大型数据集时,可能会返回大量批次。
为了最小化存储占用,Spark SQL矢量化Parquet读取器在整个数据加载过程中重复使用ColumnBatch和ColumnVector实例,而不是为每个批次创建新的实例。
这种设计使得存储占用可以忽略不计,使得ColumnVector可以优化计算效率而不是存储效率。
此外,ColumnVector支持堆内存中的内存存储和堆外内存中的内存存储。

OnHeapColumnVector是基于堆内存的实现,由一个内存中的JVM数组支持(实际上是多个数组,每个数组对应一个数据类型,但只有其中一个被使用)。
OffHeapColumnVector是在堆外实现的版本,它将列值存储在java堆之外。在内部,它使用sun.misc包中的Unsafe类来管理列向量的存储和访问。使用堆外版本的优势在于避免了JVM垃圾回收带来的开销,但同时会增加CPU使用率以进行对象的序列化和反序列化操作。
Parquet向量化读取过程
我们先来总体上了解下其执行过程:
Spark SQL通过三个主要组件实现了矢量化的Parquet读取:VectorizedParquetRecordReader、VectorizedColumnReader和VectorizedValuesReader。
首先,VectorizedParquetRecordReader执行initBatch方法,在其中创建ColumnBatch实例和相关的ColumnVector实例后,然后Spark可以通过nextKeyValue方法逐个批次地读取Parquet文件。
其次,对于Parquet每个RowGroup,将创建一个新的VectorizedColumnReader数组,其中包含每个请求的列的VectorizedColumnReader实例。
再次,在执行时VectorizedParquetRecordReader中的nextBatch方法遍历VectorizedColumnReaders数组,逐列将批次数据读入相应的ColumnVectors中。
最后,将ColumnVectors返回给VectorizedParquetRecordReader,然后与ColumnBatch中的其他ColumnVector一起被上游调用者使用。

下面我们再来着重介绍下调用过程中的这几个类。
VectorizedParquetRecordReader
VectorizedParquetRecordReader是抽象RecordReader类的实现,它重写了nextKeyValue方法,调用该方法开始进行矢量化批量读取。
在执行nextKeyValue方法之前,需要构建一个指定了'capacity'(批次大小)和内存模式(堆内或堆外)的VectorizedParquetRecordReader实例。
initialize方法继承自SpecificParquetRecordReaderBase类,它从Parquet文件的页脚中获取ParquetMetadata,应用谓词下推过滤器以定位要读取的行组,然后根据请求的架构和定位的行组创建ParquetFileReader。

其核心的nextBatch源码如上,下面我们来进行解析下:
nextKeyValue方法调用resultBatch方法开始进行批量读取流程。
如果第一次调用nextKeyValue方法并且尚未创建ColumnBatch实例,则会创建ColumnBatch实例和所有请求列的ColumnVector实例。
当ColumnBatch准备就绪时,调用nextBatch方法读取批处理列值。当nextBatch方法开始读取新的资源组时,会创建一个新的VectorizedColumnReader实例数组,每个实例用于读取一个请求的列。
nextBatch方法循环遍历VectorizedColumnReader实例,并对每个VectorizedColumnReader实例调用readBatch方法。
下面我们来看下readBatch方法是如何进行实现的。
VectorizedColumnReader & VectorizedValuesReader
VectorizedColumnReader对象通过其相应的ColumnDescriptor进行初始化,ColumnDescriptor包含列的类型和模式位置信息,以及该列的ColumnChunkpageReader。(列块中数据页的列表和字典页信息)。
如上所述,当我们进行每个批量读取,VectorizedParquetRecordReader循环遍历所有的VectorizedColumnReader,并调用每个列读取器的readBatch方法。
在readBatch方法中我们传递了两个参数:1. total要在此批次中读取的行数; 2. column用于存储返回的列值的ColumnVector实例。
当第一次调用readBatch方法(尚未读取列块页)或当前页上的所有行都已读取完毕时,将调用readPage方法来初始化用于读取页数据的ByteBufferInputStream,并根据页面的编码方法创建正确类型的ValuesReader。

如上图所示,在构建VectorizedColumnReader时,如果存在列块的字典页,则读取该字典页。
从字典页初始化一个Dictionary实例,将在稍后用于解码批次。同时,设置列块级别的标志isCurrentPageDictionaryEncoded为true。
当调用VectorizedColumnReader的readBatch方法且isCurrentPageDictionaryEncoded标志为true时,数据页中的字典编码列值将被读取到dictionaryIds中,即一个WritableColumnVector实例。
然后,将调用VectorizedColumnReader的decodeDictionaryIds方法,通过查找先前读取的字典来解码dictionaryIds中的列值。

如果isCurrentPageDictionaryEncoded标志为false,即列块页没有进行字典编码,则将调用read[primitiveType]Batch方法来读取和解码列值。
一组read[primitiveType]Batch方法封装在VectorizedColumnReader类中,每个方法用于读取Parquet支持的原始数据类型的列值。
每个方法接受三个参数:1.rowId(批量读取的起始行ID)、2.num(要在此批次中读取的行数)和3.column(存储返回的列值的ColumnVector)。

在读取页面时,将调用VectorizedColumnReader的initDataReader方法,该方法从页面头获取编码方法并相应地创建VectorizedValuesReader实例(可以是VectorizedRleValuesReader或VectorizedPlainValuesReader)。
read[primitiveType]Batch方法将使用VectorizedValueReader实例解码列值,并将其放入结果ColumnVector中。
总结
当Spark进行列式读取时,它会批量处理多行数据,通过列式读取,Spark可以更好地利用硬件的向量化指令集,从而提升读取性能。
在列式读取过程中,Spark使用了两个关键的数据结构:ColumnBatch和ColumnVector。
ColumnBatch定义了一批读数据结构,并且有一个ColumnVectors列数组。当然在我们按批进行读取后,ColumnBatch也提供了按行消费的方法。
VectorizedParquetRecordReader是抽象RecordReader类的实现,它重写了nextKeyValue方法,用于开始进行矢量化批量读取。VectorizedParquetRecordReader在初始化时需要指定批次大小和内存模式。
在具体的读取过程中,VectorizedParquetRecordReader会遍历所有的VectorizedColumnReader,并调用它们的readBatch方法来读取列值。VectorizedColumnReader会根据列块页的类型(是否字典编码)来选择不同的读取方式,包括读取字典编码列值和读取原始数据类型的列值。
最后,这里挖个坑,为什么Iceberg的Parquet读取相比Spark要快呢?
可以看看Iceberg读取Parquet时调用的类是什么和Spark有什么不同。