MapReduce如何作为Yarn应用程序运行?

本文作为《Hadoop从入门到精通》第二章的第二节,主要介绍Yarn出现之后,MapReduce体系架构发生的改变、其在Hadoop集群中的运行原理以及让MapReduce作为Yarn之上的应用程序正常运行并保持向后兼容性等内容。

我们必须承认,从Hadoop 1.x过渡到Hadoop 2.x经历了巨大的改变,MapReduce便是改变的一部分。在Hadoop 1.x中,MapReduce是本地数据处理的唯一方法。当时,业界普遍总结出的一条结论是:MapReduce只能支持4000节点左右的主机。这是因为MapReduce的核心是JobTracker,所有任务信息都汇总到Job Tracker,存在单点故障不说,任务过多会造成极大的内存开销,失败风险也陡然增加。

从操作角度来看,当时的MapReduce有任何修复或更新都会强制进行系统级别的升级更新,强制让分布式集群系统的每一个用户端同时更新,而这个过程用户本身可能并不认可,但不得不浪费时间适应新的版本,这样的用户体验是极其糟糕的。

Yarn的出现是为了让Hadoop集群可以运行任何类型的作业,并且唯一要求是应用程序遵循Yarn规范,这意味着MapReduce必须成为Yarn应用程序,并要求Hadoop开发人员重写MapReduce的关键部分。

2.2.1 解析Yarn MapReduce应用程序

我们可以将Yarn理解为集群中的分布式操作系统,而MapReduce便是其上的一个应用程序,我们必须对MapReduce架构进行更改以移植到Yarn。

图2.10 MapReduce2 Yarn应用程序交互

每个MapReduce任务都可作为单独的Yarn应用程序执行,当启动新的MapReduce任务时,首先,客户端会计算输入分片并将其与剩下的任务资源一起写入HDFS。其次,客户端与ResourceManager通信以为MapReduce任务创建ApplicationMaster。ApplicationMaster实际上是一个容器,因此当该资源在集群上可用时,ResourceManager将与NodeManager通信以创建ApplicationMaster容器。然后,MapReduce ApplicationMaster(MRAM)负责调度和协调资源并监控容器状态。MRAM从HDFS中提取输入信息,以便与ResourceManager通信时,可以请求在其输入数据本地节点上启动map容器。对ResourceManager的容器分配请求依赖于在ApplicationMaster和ResourceManager之间传递的心跳消息,心跳可能包含为应用程序分配容器等详细信息,这部分不需要过分关心,Yarn可以帮我们实现。

数据局部性作为体系结构的重要组成部分——当它请求map容器时,MapReduce ApplicationManager将使用输入分片的详细信息将容器分配给包含输入分片的节点,ResourceManager也将在这些输入分片节点进行容器分配的最佳尝试。MapReduce ApplicationManager一旦被分配容器,就会与NodeManager通信以启动map或reduce任务。此时,map/reduce进程的行为与在MRv1中的工作方式非常相似。

Shuffle

MapReduce中的shuffle负责对映射器输出排序并分配到相应的reducers,MapReduce 2并没有从根本上改变这一做法,主要区别在于map输出通过ShuffleHandlers获取,ShuffleHandlers辅助Yarn服务在每个从节点上运行。对shuffle进行了一些次要内存管理调整,例如,不再使用io.sort.record.percent。

JobTracker怎么办?

你可能会注意到此架构中不再存在JobTracker,但其在此前的版本中作用重大。事实上,JobTracker的调度部分作为通用资源调度程序移动到Yarn ResourceManager中。JobTracker的剩余部分主要是关于运行和已完成作业的元数据两个部分,每个MapReduce ApplicationMaster都托管一个UI,用于呈现当前作业的详细信息,一旦作业完成,其详细信息将被推送到JobHistoryServer,后者聚合并呈现所有已完成作业的详细信息。

2.2.2 配置MapReduce 2到Yarn的端口

本小节,我们将介绍一些受影响的常用属性以及MapReduce 2中的新属性。(具体弃用属性可在官网自行搜索,此处不一一列举)

弃用属性

大多数MapReduce 1(和许多HDFS)属性都被弃用,而不是简单的更改了属性名称,因此在Hadoop 3.x以上版本中运行时,更新属性列表是必须的。幸运的是,当运行MapReduce作业时,我们可以在标准输出上获得所有已弃用配置属性的转储,其示例如下:

2.2.3 向后兼容性

MapReduce的属性明显发生了很大改变,因此很多人会担心其向后兼容性是否受到影响。对于具有大量用户群的系统,向后兼容性是一个重要的考虑因素,因为它可以确保快速升级到新版本,而用户几乎是无感知的。

脚本兼容性

与Hadoop捆绑在一起的脚本保持不变,这意味着可以继续使用hadoop jar ...来启动作业,并且主要的Hadoop脚本依旧可用,与Hadoop捆绑的其他脚本也一样。

配置

随着Yarn的引入以及MapReduce成为一个应用程序,MapReduce 2不推荐使用MapReduce 1的属性名称,有些不再有效。将MapReduce移植到Yarn时,开发人员尽最大努力保持了现有MapReduce应用程序的向后兼容性。它们能够实现代码兼容性,但在某些情况下无法保持二进制兼容性:

  • 代码兼容性意味着只要重新编译代码,现在存在的所有MapReduce程序都可在Yarn上正常运行。这意味着无需复杂修改,代码即可在Yarn上运行。

  • 二进制兼容性意味着MapReduce字节码将在Yarn上保持不变。换句话说,不必重新编译代码也可与Hadoop 1运行相同的类和JAR,它们在Yarn上也可以正常工作。

“旧”MapReduce API(org.apache.hadoop.mapreduce包)代码是二进制兼容的,因此,如果现有MapReduce代码仅使用旧的API,那么只需设置而不需重新编译代码。对于“新”MapReduce API(org.apache.hadoop.mapreduce)的某些用法,则要考虑正在使用的API是否已更改,即类被更改为接口,如下:

  • JobContext

  • TaskAttemptContext

  • Counter

如果正在使用新的MapReduce API并且需要在两个版本的Hadoop上运行代码,那么这就引出了一个问题。如果正在使用“新”MapReduce API并拥有自己的Input/OutputFormat类或使用计数器,那么可能需要重新编译JAR才能与MapReduce 2一起使用。如果必须同时支持MapReduce 1和2,可以创建两套针对每个版本的MapReduce的JAR,最终你会得到一个十分复杂的系统。

问题:正在使用与MapReduce 2不是二进制兼容的MapReduce代码,并且希望能够以与MapReduce两个版本兼容的方式更新代码,应该如何做呢?

解决方案:使用可解决API差异的Hadoop兼容库。Elephant Bird项目包含一个HadoopCompat类,可动态确定正在运行的Hadoop版本,并使用Java反射来调用适当的方法以使用Hadoop版本。以下代码是用法示例,在InputFormat实现中,TaskAttemptContext从类更改为接口,并且HadoopCompat类用于提取Configuration对象:

import com.alexholmes.hadooputils.util.HadoopCompat; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class MyInputFormat implements InputFormat 
{ 
@Override
 public RecordReader createRecordReader(InputSplit split, 
                                              TaskAttemptContext context) 
throws IOException { 
final Configuration conf = HadoopCompat.getConfiguration(context); 
     ... 
     }
 }

表2.5 MapReduce新旧版本非二进制兼容的公共类和方法

HadoopCompat类还有一个名为isVersion2x的方法,如果类确定此运行时针对Hadoop 2.x版本,则会返回一个布尔值。这只是一个示例,完整信息可参阅Github上Elephant Bird项目的HadoopCompat页面:

https://github.com/ kevinweil/elephant-bird/blob/master/hadoop-compat/src/main/java/com/twitter/ elephantbird/util/HadoopCompat.java.  

2.2.4 运行MapReduce 2作业

在Hadoop 2.x及以上版本中运行MapRedcue任务的方式与MapReduce 1中的操作基本相同,只是我们可以使用Yarn相关命令来操作。例如,运行Hadoop示例JAR中捆绑的pi作业方式:

$ yarn jar ${HADOOP_HOME}/share/hadoop/mapreduce/*-examples-*.jar pi 2 10
Estimated value of Pi is 3.1428000

2.2.5 监视正在运行的任务并查看已归档任务

运行MapReduce任务时,监控和调试非常重要,可以查看任务状态并访问任务日志。在MapReduce 1中,这些工作全部使用JobTracker UI执行,该UI可用于查看正在运行或已存档任务的详细信息。正如2.2.1中所强调的,MapReduce 2中不再存在JobTracker;它已被ApplicationMaster中的特定UI替换,JobHistoryServer用于显示已完成作业。 为了获取map和reduce任务日志,UI重定向到NodeManager。确定ResourceManager UI的运行位置可以通过检查Yarn.resourcemanager.webapp.address或Yarn.resourcemanager.webapp.https .address(如果需要HTTPS访问)的值来检索ResourceManager主机和端口。在伪分布式的情况下,http:// localhost:8088(或HTTPS的端口8090)将主机和端口复制到浏览器就足以访问UI,因为不需要URL路径。

 

图2.11 Yarn ResourceManager UI,显示当前正在执行的应用程序


2.2.6 Uber任务

运行小型MapReduce任务时,资源调度等过程所需的时间通常占整个运行时的很大一部分。在MapReduce 1中,没有任何关于此的开销,但MapReduce 2已变得更加智能,现在可以满足快速运行轻量级任务的需求。

运行小型MapReduce任务

当处理的数据较少时,我们可以考虑在MapReduce ApplicationMaster中运行,因为删除了MapReduce需要花费的额外时间,并减少了map和reduce进程。

图2.12 JobHistory UI,显示已完成的MapReduce应用程序

如果有一个可在小型数据集上运行的MapReduce任务,并且希望避免创建map和reduce进程的开销。 那么,你可以配置以启用uber任务,这将在与ApplicationMaster相同的进程中运行mapper和Reducer。Uber任务是在MapReduce ApplicationMaster中执行作业,ApplicationMaster不是与ResourceManager联系以创建map和reduce容器,而是在其自己的进程中运行map和reduce作业,并避免启动和与远程容器通信的开销。要启用uber作业,需要设置以下属性:

mapreduce.job.ubertask.enable=true

表2.6 列出了一些控制uberization作业质量的附加属性

运行uber作业时,MapReduce会禁用 预测执行 (speculative execution ),并将任务的最大尝试次数设置为1。Uber作业是MapReduce后期新增功能,它们只适用于Yarn。

如今,我们已经看到了Yarn生态系统的快速成长。我们生活在多数据中心运行多个不同系统的世界,所有系统都有其存在的价值,但对系统管理员和架构师带来了前所未有的挑战,他们需要支持这些系统并保证它们正常运转,系统之间的数据交换成本昂贵且异常复杂,每个系统都必须解决相同的分布式问题,例如容错、分布式存储、日志处理以及资源调度。因此,Yarn与其他生态的集合一直是非常有意思的话题,在本章的最后一节,我们将会列举Yarn与其他生态集合示例。

相关文章:

1、第一章:Hadoop生态系统及运行MapReduce任务介绍!(链接: http://blog.itpub.net/31077337/viewspace-2213549/)

2、学习Hadoop生态第一步:Yarn基本原理和资源调度解析!(链接:http://blog.itpub.net/31077337/viewspace-2213602/)

请使用浏览器的分享功能分享到微信等