大家好,我是Tim。
当前Apache Spark分布式计算框架,由于其运行的健壮性和易于使用已经成为数据计算中离不开的利器。
然而随着机器硬件的不断升级,计算查询引擎中也一直在诞生着新的有趣的小东西,今天我们就来聊一聊用Rust实现的DataFusion查询引擎。
有人说,”十年前是收费的数据库软件时代,那时通过购买一套高性能的数据库,基于自己的硬件系统进行运维;”
“现在是开源的大数据系统组装的时代,基于开源的一系列大数据产品进行组合构建,或者直接购买已经形成一套的大数据组合公有云产品;”
“未来将是形成一个软件去拉通所有的开源大数据产品,同时优化在不同产品间的数据提取、转换和快速收集,实现更高效的组合复杂多变的数据产品的时代,而这个产品就是Apache Arrow DataFusion“
DataFusion到底是什么?
从接触到DataFusion开始,我就一直在想DataFusion 到底是什么,如果没有像 Spark 这样运行的健壮且易于使用的分布式框架,那还有什么意义呢?
如果说DataFusion是Pandas 替代品,那么 Spark 现在也有一个 Pandas 补充 API,那它的优势是什么。
有人说DataFusion利用 Rust 和 Arrow 的内存模型,实现了非常高的性能。
但我在简单对比下PySpark和DataFusion读取相同CSV文件进行统计计数比较上来看,其运算效率并没有比Spark快,当然这其中我并未完全对齐使用资源,但这也说明了其性能至少没有想象中的快。
那么什么是DataFusion,它的需求是什么呢?
首先,DataFusion 是一个可扩展的查询执行框架,用 Rust 编写,使用Apache Arrow作为其内存格式。
其次,DataFusion 支持用于构建逻辑查询计划的 SQL 和 DataFrame API,以及能够使用线程对分区数据源(CSV 和 Parquet)并行执行的查询优化器和执行引擎。
DataFusion 还支持通过Ballista (Arrow) crate进行分布式查询执行。
所以,要了解DataFusion需求的根源,就要了解Arrow的发展过程,如下图所示:

Apache Arrow从最初的基于内存的存储库,慢慢演化出自己的一套独立的数据类型系统,以及自己的查询引擎。
Apache Arrow作为各种数据产品中的“中转站”,提供SQL化的查询引擎,可以方便用户快速预览和探索大规模数据集,通过SQL化的能力方便的将不同数据源的数据合并、转换为统一的格式。
这也就是DataFusion诞生的初衷。
另外需求注意的是,Data Fusion 是单个节点,不是分布式的。如果你想分布式使用它,你可以使用Ray 或者Ballista (Arrow)。
DataFusion有什么特点?
-
高性能
基于Rust,不用进行垃圾回收;基于Arrow内存模型,列式存储,方便向量化计算。
-
易于连接
作为 Apache Arrow 生态系统(Arrow、Apache Parquet和 Flight)的一部分,DataFusion 与大数据生态系统的其他部分配合良好。
-
易于嵌入
DataFusion 无需修改即可用作嵌入式 SQL 引擎,也可以进行定制并用作构建新系统的基础。
-
易于扩展
DataFusion 提供了灵活的扩展机制,可以通过自定义函数、优化器规则和执行器插件来扩展其功能和性能。

从上面DataFusion查询引擎执行图可以看出,DataFusion几乎在任何地方都留有可扩展的接口,这使得目前有很多系统都是基于DataFusion进行定制构建,如专业分析数据库系统Ballista (Arrow),更快的Spark运行时替换 blaze-rs,另一个数据库的SQL支持dask sql等。

DataFusion查询引擎主要如上图所示构成,分为以下几个部分:
前端负责语法解析、语义分析和生成逻辑计划。
查询中间主要包括逻辑计划和执行计划,其中涉及很多表达式、关系算子和优化规则等。
执行运行时主要包括分配资源、向量化计算。
DataFusion和DuckDB的区别:
DuckDB 是一个开源的进程中OLAP数据库。与 DataFusion 一样的是,它支持非常快速的执行,无论是从其自定义文件格式还是直接从 parquet 文件。
与 DataFusion 不同,它是用 C/C++ 编写的,主要由用户直接用作无服务器数据库和查询系统,而不是用作构建数据库系统的库。
DataFusion和Velox的区别:
Velox 是一个执行引擎。与 DataFusion 一样,Velox 旨在为构建类似数据库的系统提供可重用的基础。
与 DataFusion 不同,它是用 C/C++ 编写的,不包含 SQL 前端或规划/优化框架。
DataFusion实现细节简析
从DataFusion查询引擎的框架上可以看出,其与Spark类通用的查询引擎在框架设计上大同小异。
只是相对Spark这种已经打磨多年的引擎来说,相对比较精简一点。
1. 列式处理的中间表示形式

DataFusion在处理数据时会从根节点进行poll数据,每次poll数据都会进行一个RecordBatch,在进行计算时向量化地处理RecordBatch。
RecordBatch的组成如上图所示,其包含数据列和schema性能两部分。schema会带有列名和数据类型。
2. 表达式运算
DataFusion在实施表达式运算时,使用了Arrow提供的向量化计算方法来加速运算。
3. 执行计划
DataFusion实现的物理计划算子具有以下特性:
异步:避免了阻塞I/O,如下所示
流式:数据是流式处理的
向量化:每次可以向量化地处理一个
RecordBatch分片:每个算子都可以并行,可以产生多个分片
多核

总结
DataFusion本身只是一个简单,高效,可扩展的查询引擎框架,可以轻易地将DataFusion嵌入服务中作为查询引擎,也可以使用DataFusion构建自己的数据库系统。
使用Rust实现的DataFusion是一件很棒的事,它不必像Spark一样沉重,也不像Pandas这样可怕,作为非常方便扩展的查询引擎,我相信它可以做好不同数据组件间连接的桥梁。
不过我现在可能不会在任何生成环境中使用DataFusion,最多可能只是用于一些本地/小规模的测试。目前DataFusion还在发展,其性能依然存在很大优化空间。
但作为快速预览大规模数据集或者将不同数据源的数据合并、转换为统一的格式的工具还是值得尝试。