如何在MapReduce中使用Avro数据格式?

本文作为《Hadoop从入门到精通》大型专题的第三章后续,主要是对前一章中所提及的Protocol Buffers概念的补充以及如何在MapReduce中使用Avro数据格式。Avro改进了Protocol Buffers,因为其代码生成是可选的,并且以容器文件格式嵌入模式,允许动态发现和数据交互。Avro有一种机制来处理使用通用数据类型的模式数据,这部分将会在之后的章节中呈现(往期文章可自行查看文末链接)。

3.3.3 Protocol Buffers

谷歌开发人员发明了Protocol Buffers ,这是一种以紧凑而有效的方式在多种语言编写的服务之间交换数据的方式。Protocol Buffers现在是谷歌事实上的数据格式,超过12,000个谷歌.proto文件中定义了48,000多种不同的消息类型。

自2008年以来,MapReduce的目标一直是添加对Protocol Buffers的本机支持。因此,那时的许多开发人员一直在寻找在Hadoop中使用Protocol Buffers的方法。之前有一种可用的方法,即在SequenceFiles中编码Protocol Buffers,或者使用Elephant Bird或Avro,它们通过将自己包装在自定义文件格式中支持Protocol Buffers。但是,这些都不是最好的办法,直到MapReduce完全支持在Hadoop中使用Protocol Buffers。

现在可以通过多种方式在Hadoop中使用Protocol Buffers:

  • 可以在SequenceFiles中以二进制形式序列化Protocol Buffers对象;

  • Elephant Bird(https://github.com/kevinweil/elephant-bird)是一个来自Twitter的开源项目,支持自定义二进制文件格式的Protocol Buffers;

  • Parquet是一种柱状文件格式(之前的章节中曾介绍过),支持Protocol Buffers对象模型,并允许有效地将Protocol Buffers写入和读取为柱状形式。

在这些选项中,Parquet是使用Protocol Buffers的最优方式,不仅允许使用Protocol Buffers本地工作,而且还开发了许多数据处理工具。

Thrift是另一种数据格式,与Protocol Buffers一样,它没有MapReduce支持。同样,我们必须依赖其他工具来处理Hadoop中的Thrift数据。

3.3.4 Thrift

Facebook创建了Thrift以帮助实现高效的数据表示和传输。Facebook将Thrift用于许多应用程序,包括搜索,日志记录及其广告平台。使用Protocol Buffers的三个选项也适用于Thrift,其中最棒的数据序列化格式就是Avro。

3.3.5 Avro

Doug Cutting创建了一个数据序列化和RPC库Avro,以帮助改进MapReduce中的数据交换,互操作性和版本控制。Avro使用紧凑的二进制数据格式,你可以选择压缩以提高序列化时间。虽然它具有模式的概念,类似于Protocol Buffers,但Avro改进了Protocol Buffers,因为它的代码生成是可选的,并且以容器文件格式嵌入模式,允许动态发现和数据交互。

Avro文件格式如图3.11所示。模式被序列化为标题的一部分,这使得反序列化变得简单,并且放开了用户必须维护和访问与之交互的Avro数据文件之外的模式限制。每个数据块包含许多Avro记录,默认情况下为16 KB。

图3.11  Avro容器文件格式

数据序列化支持代码生成、版本控制和压缩,并且与MapReduce具有高度集成。同样重要的是模式演化,这就是Hadoop SequenceFiles没有吸引力的原因,其不支持模式或任何形式的数据演变。

本节,你将了解Avro的架构和代码生成功能,读取和编写Avro容器文件以及Avro与MapReduce集成的各种方法。最后,我还将关注Avro对Hive和Pig的支持。

首先,让我们来看看Avro的架构和代码生成。

Avro的schema和代码生成

Avro具有通用数据和特定数据的概念:

  • 通用数据允许以较低级别处理数据,而无需了解schema细节。

  • 特定数据允许使用代码生成的Avro原语与Avro协同工作,该原语支持使用Avro数据的简单且类型安全的方法。

该技术着眼于如何使用Avro中的特定数据。

问题

希望定义Avro模式并生成代码,以便可以使用Java中的Avro记录。

解决方案

以JSON格式编写schema,然后使用Avro工具生成丰富的API以与数据进行交互。

讨论

可以通过使用代码生成的类或其泛类使用Avro。在本文中,我们将使用代码生成的类。

在代码生成方法中,一切都以模式开始。第一步是创建Avro模式来表示stock数据输入:

Avro支持模式数据和RPC消息的代码生成。要为模式生成Java代码,请使用Avro工具JAR,如下所示:

生成的代码将放入hip.ch3.avro.gen包中。既然已生成代码,那么如何读取和编写Avro容器文件?

代码3.5 从MapReduce外部编写Avro文件

如上所示,我们可以指定应该用于压缩数据的压缩编解码器。该示例使用的是Snappy,它是读写速度最快的编解码器。

以下代码演示了如何从输入文件的行封送Stock对象。如下所示,生成的Stock类是包含一组setter(和匹配的getter)的POJO:

如何读刚刚写进去的文件:

继续执行这个编写器和读者对:


Avro捆绑了一些工具,可以轻松检查Avro文件的内容。要以Avon格式查看Avro文件内容,只需运行以下命令:

以上是假定该文件存在于本地文件系统中。同样,你可以使用以下命令获取Avro文件的JSON表示:


我们可以在没有任何选项的情况下运行Avro工具来查看可以使用的所有工具:

tojson工具的一个缺点是不支持在HDFS中读取数据。因此,我将一个实用程序与AvroDump捆绑在一起,可以在HDFS中转储Avro数据的文本表示,我们可以用它来检查Avro MapReduce作业输出:

此实用程序支持多个文件(需要以CSV分隔)和通配,因此可以使用通配符。以下示例显示如何将生成Avro输出的MapReduce作业内容转储到名为mr-output-dir的目录中:

选择在MapReduce中使用Avro的方法

Avro支持多种方式在MapReduce中使用Avro数据。本文列举了处理数据的不同方法,并提供了有关哪种情况需要采用哪种方法的指导。

问题  

希望在MapReduce作业中使用Avro,但不清楚应该选择哪种可用的集成选项。

解决方案

了解有关每个集成选项的详细信息,并选择最适合的用例。

讨论

可以通过三种方式在MapReduce中使用Avro:

  • Mixed-mode - 希望将Avro数据与作业中的非Avro数据混合时使用;

  • Record-based - 当以非Key/Value方式提供数据时很有用;

  • Key/value-based - 用于当数据必须适合特定模型时。

Mixed-mode(混合模式)

此用例适用于满足以下任一条件的情况:

  • mapper输入数据不是Avro格式;

  • 不希望使用Avro在mapper和Reducer之间发出中间数据;

  • 作业输出数据不是Avro形式。

在任何情况下,Avro的mapper和reducer类都无法帮助你,因为它们的设计是假设Avro数据在MapReduce作业中端到端地流动。在这种情况下,我们将需要使用常规MapReduce的mapper和reducer类,并允许使用Avro数据方式构建作业。

Record-based

Avro数据是基于记录的,与基于key/value的MapReduce相比,会导致阻抗不匹配。为了支持Avro基于记录的root,Avro捆绑了一个不基于KV的mapper类,而只提供具有单个记录的派生类。

Key/value-based

如果Avro数据内部遵循KV结构,则可以使用一些Avrosupplied mapper类来转换Avro记录并以key/value形式将其提供给mapper。使用此方法,只能使用字面上具有“key”和“value”元素的模式。

概要

选择与Avro正确的集成级别是输入和输出函数,以及希望如何处理Avro内部的数据。本文检查了与Avro集成的三种方法,以便可以为用例选择正确的方法。

在MapReduce中混合Avro(Mixing Avro)和非Avro(non-Avro)数据

MapReduce中此级别的Avro集成适用于具有非Avro输入并生成Avro输出的情况,反之亦然。在这种情况下,Avro mapper和ruducer类不适用,本文将使用Avro混合模式。

问题

希望在MapReduce作业中以混合模式使用Avro,Avro捆绑的mapper和reducer类不支持。

解决方案

使用低级方法设置作业,并使用常规Hadoop mapper和reducer类通过MapReduce作业驱动Avro数据。

讨论

Avro附带了一些mapper和reducer类,可以将它们子类化以与Avro一起使用,它们在mapper和reducer交换Avro对象的情况下非常有用。但是,如果没有要求在map和reduce任务之间传递Avro对象,最好直接使用Avro输入和输出格式类,正如以下代码所示,它产生了所有stock值的平均值。

我们先来看看作业配置。本文的主要任务是使用Avro格式消耗stock数据并生成stock平均值。为此,我们需要对两个模式进行配置,并指定Avro输入和输出格式类:

接下来是Map类。整个Avro记录作为map功能的输入key提供,因为Avro支持记录,而不是key/value对(虽然,之后我们也可以看到,Avro确实有办法使用key/value向map提供数据,如果Avro架构具有称为key和value的字段)。从实施角度来看,map功能从stock记录中提取必要的字段并发送到reducer,stock symbol 和stock price作为key/value对:

此处之所以使用较旧的MapReduce API(org.apache.hadoop.mapred API)是因为此处使用的AvroInputFormat和AvroOutputFormat类仅支持旧API。

最后,reduce函数总结所有的stock prices并输出平均价格:

可以按如下方式运行MapReduce代码:

MapReduce作业正在从输入中输出不同的Avro对象(StockAvg),可以通过编写代码(未列出)来转储Avro对象以验证作业是否产生了期望的输出:

总结

如果不希望以Avro形式输出中间map,或者有非Avro输入或输出,则此技术非常有用。接下来我们将看一下如何在MapReduce中用Avronative方法处理数据。

在MapReduce中使用Avro记录

与SequenceFile不同,Avro不是本机key/value序列化格式,因此可能需要一些工具才能使其与MapReduce一起使用。本文将检查特定于Avro的mapper和reducer类,这些类公开了可用于输入和输出数据的基于记录的接口。

问题

希望在MapReduce作业中使用Avro进行端到端数据传输,并且以面向记录的形式与输入和输出数据交互。

解决方案

扩展AvroMapper和AvroReducer类以实现MapReduce作业。

讨论

Avro附带了两个类,抽象出了MapReduce的key/value特性,并且暴露record-based的API。本文将实现与之前相同任务的MapReduce作业(计算每个stock symbol的平均价格),并在整个作业中使用Avro。首先,让我们来看看Mapper类,它将扩展AvroMapper:

首先要注意的是,类定义了两种类型,而不是MapReduce中的四种类型。AvroMapper抽象出mapper输入和输出key/value特征,用单一类型替换每个特征。

如果仅map作业,则定义的类型将是输入和输出类型。但是,如果正在运行完整的MapReduce作业,则需要使用Pair类,以便可以定义map输出的key/value对。Pair类要求存在用于key和value部分的Avro模式,这就是使用Utf8类而不是直接Java字符串的原因。

现在让我们来看看AvroReducer实现, 需要定义三种类型:map输出的key,value类型以及reducer输出类型:

现在可以在驱动程序中将它们全部连接起来,本文将定义输入和输出类型以及所需的输出压缩(如果有):

在作业完成后检查输出:

总结

在整个MapReduce作业中以Avro形式保存数据的情况下,此技术非常方便且不需要输入或输出数据基于key/value。

但是,如果确实需要基于key/value的数据,并且仍然希望使用紧凑型序列化大小和内置压缩等该怎么办?

在MapReduce中使用AvroKey/Value对

MapReduce的原生数据模型是key/value对,Avro是基于记录的,Avro没有对key/value数据的本机支持,但Avro中存在一些类帮助建模Key/Value数据并在MapReduce中本地使用。

问题

希望将Avro用作数据格式和容器,但希望使用Avro中的Key/Value对进行数据建模,并将它们用作MapReduce中的本机Key/Value对。

解决方案

使用AvroKeyValue,AvroKey和AvroValue类来处理AvroKey/Value数据。

讨论

Avro有一个AvroKeyValue类,封装了两个名为key和value的通用Avro记录。AvroKeyValue用作辅助类,可以轻松读取和写入Key/Value数据,这些记录的类型可以自定义。在这种技术中,我们将重复进行计算stock平均价格的MapReduce作业,但这次使用Avro的Key/Value框架。首先需要为工作生成输入数据,在这种情况下,我们将stock symbol放在key中,Stock object 放在value中:

继续生成HDFS文件,其中包含Key/Value格式的stock数据:

如果想知道刚生成文件的Avro架构定义,可以使用AvroDump实用程序显示文件内容:

对于MapReduce代码一次性定义mapper,reducer和driver:

AvroKey和AvroValue包装器用于在mapper中提供输入数据,以及在reducer中输出数据。巧妙之处在于,Avro非常智能,可以支持Hadoop Writable对象并自动将其转换为Avro对象,这就是为什么不需要告诉Avro输出密钥模式类型。

可以使用以下命令运行MapReduce作业:

同样,可以使用AvroDump工具查看输出:

总结

这就是在MapReduce中处理数据的三种Avro方法。每种方法都适用于特定任务,可以选择最适合需求的方法。

控制MapReduce中的排序方式

如果决定使用Avro数据作为中间map输出,你可能想知道对分区、排序和分组工作的控制方式。

问题

希望控制MapReduce如何对reducer输入进行排序。

解决方案

修改Avro架构以更改排序行为。

讨论

如果将Avro对象用作mapper中的键输出,则默认情况下会发生以下情况:

  • Avro对象中的所有字段都用于分区,排序和分组。

  • 字段按模式中的序号位置排序,这意味着如果有一个包含两个元素的模式,则模式中的第一个元素首先用于排序,然后是第二个元素。

  • 在元素内,使用特定于类型的比较进行排序。如果要比较字符串,则排序将是词典,如果正在比较数字,则使用数字比较。

某些行为可以更改,以下是Stock架构的修改版本:

可以通过使用order属性对其进行修饰并指定应使用降序来修改字段的排序行为。或者,通过设置要忽略的顺序来排除分区、排序和分组中的字段。

请注意,这些是架构范围的设置,并且没有简单方法来指定基于每个作业的自定义分区/排序/组设置,你可以继续编写自己的分区,排序和分组函数(就像对Writable一样),但如果Avro有辅助函数来简化此过程,那将非常有用。

Avro和Hive

直到最近,Hive项目才有内置的Avro支持,此技术介绍了如何在Hive中使用Avro数据。

问题

希望在Hive中使用Avro数据。

解决方案

使用Hive的Avro Serializer/Deserializer。

讨论

Hive版本0.9.1和更新版本捆绑了Avro SerDe,这是Serializer/Deserializer的缩写,允许Hive从表中读取数据并将其写回到表中。需要将本文捆绑的Avro架构复制到HDFS中,并创建一个包含Avro stock记录示例的目录:

接下来,启动Hive控制台并为刚刚创建的目录创建外部Hive表,需要在HDFS中指定Avro架构位置,用HDFS用户名替换YOUR-HDFS-USERNAME:

AvroSerDe实际支持三种方式定义Avro表模式,本文选择最有可能在生产中使用的方法。

像任何Hive表一样,我们可以查询Hive来描述表的模式:

运行查询以验证一切是否正常。以下Hive查询语言(HiveQL)将计算每个代码的stock记录数:

如果想将数据写入Avro支持的Hive表,以下示例显示了如何复制stocks表中的记录子集并将其插入新表中。此示例还重点介绍了如何使用Snappy压缩编解码器对新表进行写入:

Avro和Pig

就像Hive一样,Pig也有对Avro的内置支持。

问题

使用Pig读取和写入Avro数据。

解决方案

在Pig的Piggy Bank库中使用AvroStorage类。

讨论

Piggy Bank是一个包含Pig实用程序的有用集合库,其中一个是可用于在HDFS中读取和写入Avro数据的AvroStorage类。 此处将读入部分stock数据,执行简单聚合并将过滤后的数据存储回HDFS。

开始之前,我们将一些Avro的stock数据加载到HDFS目录中:

在Pig-land中,第一步是注册AvroStorage工作所需的JAR,可能需要搜索与你正在使用的Hadoop发行版捆绑在一起的JAR特定位置。以下代码中的位置假定Apache Hadoop和Pig安装在/usr/local下:

接下来,将stock加载到Pig中,然后使用LOAD和DESCRIBE运算符显示架构详细信息:

请注意,不必提供有关Avro架构的详细信息,这是因为使用Avro容器格式具有嵌入在标头中的架构。如果文件没有嵌入架构,AvroStorage仍然可以支持数据,但需要将Avro架构上传到HDFS并使用“schema_file”选项。

验证Avro和Pig是否正在协同工作,可以执行简单聚合并计算每个stock的记录数:

以下示例显示了如何在Pig中输出Avro数据,该示例根据输入数据过滤Google stock,并将它们写入HDFS的新输出目录。 这也展示了如何使用Snappy压缩作业输出:

Avro数据写入HDFS时需要指定持久存储数据的Avro架构,以上示例使用data选项告诉AvroStorage使用嵌入在输入目录下的文件中的Avro架构。

与加载文件一样,还有其他方法可以告诉AvroStorage架构详细信息,这些方法记录在Pig的wiki上。

总结

最后几种技术已经证明了将Avro与MapReduce,Hive和Pig一起使用是多么容易和直接。使用Avro存储数据可提供许多有用的免费功能,例如版本控制支持,压缩,可拆分性和代码生成。Avro与MapReduce,Hive,Pig以及Impala和Flume等众多其他工具的强大集成意味着值得作为备选数据格式。

到目前为止,我们一直专注于基于行的文件格式,这些格式并不总是布局数据的最佳方式。在下一节,我们将了解柱状存储的优势并查看Parquet(柱状存储)实例。

相关文章:

1、《第一章:Hadoop生态系统及运行MapReduce任务介绍!》链接:    http://blog.itpub.net/31077337/viewspace-2213549/

2、《学习Hadoop生态第一步:Yarn基本原理和资源调度解析!》链接:    http://blog.itpub.net/31077337/viewspace-2213602/

3、《MapReduce如何作为Yarn应用程序运行?》链接:    http://blog.itpub.net/31077337/viewspace-2213676/

4、《Hadoop生态系统各组件与Yarn的兼容性如何?》链接:    http://blog.itpub.net/31077337/viewspace-2213960/

5、《MapReduce数据序列化读写概念浅析!》链接:   http://blog.itpub.net/31077337/viewspace-2214151/

6、《MapReuce中对大数据处理最合适的数据格式是什么?》链接:  http://blog.itpub.net/31077337/viewspace-2214325/

7、《如何在MapReduce中使用SequenceFile数据格式?》链接: http://blog.itpub.net/31077337/viewspace-2214505/

请使用浏览器的分享功能分享到微信等