Apache Arrow: 数据工程的未来

Apache Arrow其实诞生的非常早,初创团队主要来自于Dremio公司和由Apache Parquet(一种列式存储格式)的开发人员于2016年创建。

其最初的定位是通过定义一套通用数据结构和 API,使数据可以在不同的编程语言和计算引擎之间以零复制(zero-copy)的方式进行共享和交换,从而提高数据处理的效率。

Arrow 的核心数据结构是统一的列式内存格式,该格式采用了内存连续布局和零复制策略,以减少数据传输的开销。它支持对连续的列式数据使用现代处理器中包SIMD(单指令、多数据)进行向量化操作。此外,Arrow 还提供了一套丰富的数据操作接口,如过滤、转换、聚合等,以支持高效的数据分析和处理。

随着时间的推移,Apache Arrow 在逐渐扩展和发展,到现在Apache Arrow已经发展成为一个用于构建处理和传输大型数据集的高性能应用程序软件开发平台,它不仅支持多种编程语言(如C++, Java, Python, R等),还与许多主流的数据处理框架集成,如 Apache Spark、Pandas、TensorFlow 等。

为什么需要Arrow ?

现在的数据工程,使用着各种数据源(Mysql, HDFS)、格式(Parqurt, ORC, TFrecord)、系统和工具。

数据开发人员需要创建自定义数据结构来表示正在处理的内存中的数据集。而由于这些数据结构的“定制”特性,他们还必须同时开发序列化接口,以支持这个数据结构在不同文件格式、引擎、数据库客户端和其他数据传输接口之间进行转换。

这样做的结果是开发人员需要将数据从一种格式序列化为另一种格式,即花费了大量开发时间同时也浪费大量的 CPU 时间。此外,这些传输中其代码各式各样,数据格式也各式各样,这可能会在数据管道中带来开销、效率低下和错误。

Apache Arrow 使数据工程师能够跨不同系统和语言一致、高效地处理数据,而不必担心序列化、反序列化、转换或兼容性问题。

Apache Arrow 的一个关键组件是其内存中的列格式,这是一种与语言无关的标准化规范,用于表示内存中的结构化、表状数据集。它允许计算例程和执行引擎在扫描和迭代大数据块时最大限度地提高效率。特别是,连续的列式布局支持使用现代处理器中包含的最新 SIMD(单指令、多数据)操作进行矢量化。

Apache Arrow 对于数据应用程序有几个好处:

  • 性能

    Apache Arrow 利用列式数据表示的优势,例如更好的压缩、更快的扫描、矢量化操作和缓存效率。它还支持现代处理器的 SIMD(单指令、多数据)优化。此外,它消除了在系统或语言之间移动数据时昂贵的序列化和反序列化的需要。

  • 互操作性

    Apache Arrow 提供了一种通用的数据格式,可以在不同的系统和语言之间共享,而不会丢失任何信息或功能。它还促进了与 Arrow 数据一起使用的库和算法的重用,甚至跨语言也是如此。

  • 标准化

    Apache Arrow 提供了一个标准化的、与语言无关的规范,用于表示内存中的结构化、表状数据集,支持随机访问和基于流/扫描的任务。这简化了需要使用不同数据源和格式的数据工程应用程序的开发和维护。

  • 灵活性

    Apache Arrow 支持丰富的数据类型系统(包括嵌套和用户定义的数据类型),旨在帮助满足分析数据库系统、数据框架库等的需求。它还允许用户定义自定义元数据和格式扩展。

Arrow 和 Parquet间的区别?

Parquet 不是“运行时内存格式”,它属于文件格式。文件格式几乎总是需要反序列化为某种内存数据结构才能进行处理,而 Arrow 是内存中的数据结构。

Parquet 是一种存储格式,旨在利用先进的压缩和编码技术实现最大空间效率。当想要在存储千兆字节或更多数据的同时最大限度地减少磁盘使用时,它是理想的选择。这种效率是以相对昂贵的读入内存为代价的,因为 Parquet 数据不能直接操作,而是必须以大块的形式进行解码。

相反,Arrow 是一种内存格式,旨在直接有效地用于计算目的。Arrow数据未经过压缩(或者在使用字典编码时仅进行轻微压缩),而是以 CPU 的自然格式进行布局,以便可以在任意位置全速访问数据。

因此,Arrow 和 Parquet 相辅相成,通常在应用中一起使用。使用 Parquet 将数据存储在磁盘上并以 Arrow 格式将其读入内存将使您能够充分利用计算硬件。

不过Arrow项目平台中,也提供了一种”Arrow File“,它是一种进程间通信 (IPC) 机制来传输 Arrow 列数组的集合。它可以使用 Arrow“流格式”在进程之间同步使用,也可以通过首先使用 Arrow“文件格式”将数据持久保存在存储上来异步使用。

Arrow IPC 机制基于 Arrow 内存中格式,因此磁盘上表示和内存中表示之间不需要转换。因此,对 Arrow IPC 文件执行分析可以使用内存映射,从而避免任何反序列化成本和额外的数据副本。不过需要注意的是,这种”Arrow File“一般都是临时数据传输中使用。

Arrow 与 Protobuf 有何关系?

与Parquet类似,Protobuf不是“运行时内存格式”。Protobuf的表示不适合直接处理,其数据必须反序列化为像 Arrow 这样的内存形式才能进行处理。

例如,Protobuf 中的无符号整数被编码为 varint,其中每个整数可以具有不同的字节数,最后三位包含字段的连线类型。无法使用 CPU 来添加这种格式的数字。

Protobuf 有用于执行反序列化的库,但它们并不针对通用的内存格式。C# 程序反序列化的数据与Java 代码反序列化的数据,不具有相同的表示形式。您需要将数据从一种语言编组到另一种语言。

Arrow 避免了这种情况,但是以增加空间为代价的。对于在线上序列化某些类型的数据(例如单个记录或具有许多可选字段的稀疏数据),Protobuf 可能是更好的选择。就像 Parquet 一样,这意味着 Arrow 和 Protobuf 可以很好地互补。例如,Arrow Flight 使用 gRPC 和 Protobuf 来序列化其命令,而数据则使用二进制 Arrow IPC 协议进行序列化。

如何使用Apache Arrow?

Apache Arrow 可以通过多种方式使用。以下是如何在 Python 中使用 Apache Arrow 的一些示例:

1.读取和写入 Arrow 文件

您以使用该库从磁盘或内存pyarrow读取和写入 Arrow 文件(带有.arrow扩展名)。

import pyarrow as pa
import pyarrow.feather as feather

# Create a table from a list of dictionaries
data = [
    {"name""Alice""age"25"gender""F"},
    {"name""Bob""age"30"gender""M"},
    {"name""Charlie""age"35"gender""M"}
]
table = pa.Table.from_pydict(data)

# Write the table to an Arrow file
feather.write_feather(table, "data.arrow")

# Read the table from an Arrow file
table = feather.read_table("data.arrow")


2. Arrow 和 Pandas 之间的转换

可以使用该pyarrow库在 Arrow 表和 Pandas 数据框之间进行转换。例如:

import pyarrow as pa 
import pandas as pd 

# 创建 Pandas 数据框
df = pd.DataFrame(data) 

# 将数据框转换为 Arrow 表
table = pa.Table.from_pandas(df) 

# 将表转换回 Pandas 数据框架
df = table.to_pandas()


3.将 Arrow 与 Spark 结合使用

pyarrow在 Python 中使用 Spark DataFrame 时,您可以使用该库来启用 Arrow 优化。例如:

import pyarrow as pa 
import pyspark.sql.functions as F 

# 启用arrow优化
spark.conf.set ( "spark.sql.execution.arrow.pyspark.enabled" , "true" ) 

# 从 Pandas 数据帧创建 Spark DataFrame
 df = Spark.createDataFrame(pd.DataFrame(data)) 

# 应用一个使用 Spark UDF箭头数据
@F.pandas_udf( "int" , F.PandasUDFType.SCALAR ) 
def  add_one ( s: pd.Series ) -> pd.Series: 
    return s + 1

 df = df.withColumn( "age_plus_one", add_one(df.age))


4.使用内存池实现极快的访问

Arrow 的核心功能之一是零拷贝读取。这允许直接访问数据而无需复制内存缓冲区。为了充分利用这一点,我们需要使用 Arrow 内存池。

内存池允许您有效地重用分配的内存并避免不必要的分配或复制。以下是在 Python 中配置内存池的方法:

import pyarrow as pa 

# 创建内存池
pool = pa.default_memory_pool() 

# 将内存使用限制为 512 MB
 pool = pa.proxy_memory_pool(pool, 0.5 * 2 ** 30 ) 

# 使用此池写入表     
with pa.BufferOutputStream(pool=pool)as stream:
  writer = pa.RecordBatchStreamWriter(stream, table.schema) 

  writer.write_table(table) 
  writer.close()

通过内存池,可以在多次读取或写入中使用相同的内存缓冲区。这减少了总体内存使用量并避免了昂贵的分配。

让我们看一下处理大量数据的批处理管道的真实示例:

# 使用内存池处理批处理
with pa.BufferOutputStream(pool=pool) as out_stream: 

  for batch in batch_reader: 

    # 过滤批处理
    filtered = filter_func(batch) 

    # 写入过滤批处理
    writer = pa.RecordBatchStreamWriter(out_stream,filtered.schema) 
    writer. write_batch(过滤) 
    writer.close()

通过跨批次重用内存池,即使在处理大量数据时,我们也可以最大限度地减少分配。智能内存管理可显着提高性能。

5. 利用并行计算

分析工作负载通常需要执行复杂的操作,例如过滤、聚合、排序等。Arrow 允许通过将数据拆分为可以独立处理的块来利用并行性。

pyarrow.compute模块包含可应用于 Arrow 表或记录批次的矢量化函数:

import pyarrow as pa 
import pyarrow.compute as pc 

# 定义过滤函数
filter_fn = lambda batch: batch.column( 'age' ) > 30 

# 并行过滤表  
table = pc.filter(table,filter_fn,nthreads = 8

这使过滤器操作在 8 个线程上并行化。其他函数如reducesortcol_distinct也支持类似的并行化。

让我们看一个更复杂的分析示例:

# 将数据加载到 Arrow 表
table = load_data() 

# 在并行中过滤 30 岁以上的年龄
table = pc.filter(table, filter_age, nthreads= 8 ) 

# 并行聚合
totals = pc.sum(table, value=[ 'salary' , 'bonus' ]) 

# 将合计奖金除以 10
totals[ 'bonus' ] /= 10  

# 写入输出
write_file(totals)

这使用所有可用核心以矢量化方式应用过滤、聚合和算术等多种操作。通过 Arrow 计算模块进行并行分析,可以利用现代硬件实现更快的数据处理。

6. 与不同数据格式无缝互操作

现实世界的数据管道需要使用不同的数据源和格式。Arrow 生态系统对许多常见格式提供一流的支持:

Parquet:针对分析优化的柱状格式:

import pyarrow.parquet as pq 

# 将 Parquet 文件读入 Arrow Table
 table = pq.read_table( 'data.parquet' ) 

# 将 Arrow Table 写入 Parquet 文件
pq.write_table(table, 'output.parquet' )

JSON:Web API 和文档的简单文本格式:

import pyarrow as pa 
import pyarrow.json as json 

# 加载 JSON 文档
table = json.read_json( 'data.json' ) 

# 将 Arrow 转换为 JSON
 json_text = json.write_json(table)

此外还包括,ORC:Hadoop 生态系统中流行的列式格式。Protobuf:语言中立的数据序列化格式。以及Apache Arrow 生态系统中的更多格式。

这使得集成不同的系统变得容易——以 JSON 格式从 MongoDB 获取数据,使用 Arrow 进行处理,并将最终结果作为 Parquet 文件存储在云存储上。数据格式之间的互操作能力消除了端到端数据管道中的摩擦。

用 Arrow Flight 构建高性能分布式管道

现实世界的数据工程通常涉及在远程位置、服务器、数据库等之间移动大量数据。

Arrow Flight是一个使用 gRPC 或 REST 接口进行标准化高性能数据交换的框架。

让我们看一下使用 Arrow Flight 的示例管道:

# 服务器公开 Flight 端点
import pyarrow.flight as fl 

defserve  ():   
  Flight = fl.FlightServerBase()  
  Flight.register_endpoint( '/data' , data_provider)
  Flight.serve() 

# 客户端请求数据  
client = fl.FlightClient( ' grpc://server:port' ) 
# 获取机票
Ticket = fl.Ticket( 'data' ) 
# 将数据流式传输为 
Arrow table table = client.do_get(ticket)

服务器向 Flight 端点注册数据提供者类。客户端通过传递机票来请求数据。使用 Arrow 柱状格式在网络上高效地传输数据。

该架构具有强大的优势:

  • 将数据提供者与消费者分离

  • 使用 Arrow 标准化数据交换

  • 通过 gRPC 实现高性能流式传输

  • 简化不同系统的连接

您可以使用 Arrow Flight 构建可扩展的数据管道,以在端点之间高效地移动大量数据。

高性能Arrow DataFusion

Apache Arrow DataFusion是一个可扩展的查询引擎和数据库工具包,用Rust编写,使用Apache Arrow作为其内存格式。DataFusion 以及Apache Calcite、Facebook 的Velox和类似技术是下一代“解构数据库”架构的一部分,其中新系统构建在快速、模块化组件的基础上,而不是作为单个紧密集成的系统。

例如我们要实现下述功能:

# SELECT "UserID""SearchPhrase", COUNT(*)
# FROM hits
# GROUP BY "UserID""SearchPhrase"
# ORDER BY COUNT(*)
# DESC LIMIT 10;

这种SQL方法虽然简单,但速度慢且内存效率非常低,计算不到 1% 的数据集的结果需要超过 40 秒。

在arrow中我们可以使用python程序来执行查询统计任务。

import pandas as pd
import time
from collections import defaultdict
from operator import itemgetter

start = time.time()
+#hits = pd.read_parquet('hits_multi/hits_0.parquet', engine='pyarrow')
hits = pd.read_parquet('hits.parquet', engine='pyarrow')
print("{}s: Loaded {} rows from parquet".format(time.time() - start, len(hits)))

# build groups
counts = defaultdict(int)

for index, row in hits.iterrows():
    group = (row['UserID'], row['SearchPhrase']);
    # update the dict entry for the corresponding key
    counts[group] += 1

print("{}s: Counted groups".format(time.time() - start))

# Print the top 10 values
print (dict(sorted(counts.items(), key=itemgetter(1), reverse=True)[:10]))
print("{}s: Done".format(time.time() - start))

如上所示,在DataFusion使用了向量化分组,可以10秒内返回结果。

DataFusion 的哈希分组算法通过并行处理和优化的数据结构来提高查询性能,适合各种聚合操作。

关于DataFusion实现过程可以查看 https://howqueryengineswork.com/01-what-is-a-query-engine.html

总结

Apache Arrow 是一个开源的内存映射数据格式,基于现代平台的高性能的实现,具有以下特性:

  1. 数据的高效性:Apache Arrow 通过将数据存储在内存中的列式存储格式,在数据传输和处理过程中能够提供出色的性能。这种列式存储方式可以减少I/O操作,通过压缩和对数据类型进行优化,提高数据处理的速度和效率。

  2. 跨平台和跨语言支持:Apache Arrow 提供了多种语言的API,包括Python、Java、C++等,以及与常见数据处理框架(如Pandas、Spark等)的无缝集成。这使得不同语言和工具之间可以共享和处理相同的数据格式,方便了跨团队和跨平台的数据协作和处理。

  3. 大规模数据处理:Apache Arrow 的设计目标之一是支持大规模数据处理,特别是大数据场景下的快速数据访问和分析。它采用了零拷贝和并行处理的机制,有效地降低了数据的复制和转换成本,同时优化了计算资源的利用,使得在大规模集群上进行高性能的数据处理成为可能。

  4. 数据互操作性:Apache Arrow 提供了一种统一的数据格式和语义,可以无缝地将不同系统中的数据进行互操作。这样就避免了数据在不同系统之间的转换和处理过程中可能引入的错误和性能损失,提高了数据的可移植性和可扩展性。


如果觉得这篇文章对你有所帮助,
请点一下或者,是对我的肯定和支持~


请使用浏览器的分享功能分享到微信等