Hadoop2.6.0运行mapreduce之Uber模式验证
前言
在有些情况下,运行于Hadoop集群上的一些mapreduce作业本身的数据量并不是很大,如果此时的任务分片很多,那么为每个map任务或者reduce任务频繁创建Container,势必会增加Hadoop集群的资源消耗,并且因为创建分配Container本身的开销,还会增加这些任务的运行时延。如果能将这些小任务都放入少量的Container中执行,将会解决这些问题。好在Hadoop本身已经提供了这种功能,只需要我们理解其原理,并应用它。
Uber运行模式就是解决此类问题的现成解决方案。本文旨在通过测试手段验证Uber运行模式的效果,在正式的生成环境下,还需要大家具体情况具体对待。
Uber运行模式
Uber运行模式对小作业进行优化,不会给每个任务分别申请分配Container资源,这些小任务将统一在一个Container中按照先执行map任务后执行reduce任务的顺序串行执行。那么什么样的任务,mapreduce框架会认为它是小任务呢?
- map任务的数量不大于mapreduce.job.ubertask.maxmaps参数(默认值是9)的值;
- reduce任务的数量不大于mapreduce.job.ubertask.maxreduces参数(默认值是1)的值;
- 输入文件大小不大于mapreduce.job.ubertask.maxbytes参数(默认为1个Block的字节大小)的值;
- map任务和reduce任务需要的资源量不能大于MRAppMaster(mapreduce作业的ApplicationMaster)可用的资源总量;
优化
限制任务划分数量
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize = 30 /wordcount/input /wordcount/output/result1
还可以看到一共是6个map任务和1个reduce任务,如下图:
开启Uber模式
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize = 30 -D mapreduce.job.ubertask.enable = true /wordcount/input /wordcount/output/result2
输出字段 | 描述 |
TOTAL_LAUNCHED_UBERTASKS | 启动的Uber任务数 |
NUM_UBER_SUBMAPS | Uber任务中的map任务数 |
NUM_UBER_SUBREDUCES | Uber中reduce任务数 |
因此我们知道这7个任务都在Uber模式下运行,其中包含6个map任务和1个reduce任务。
其它测试
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize = 20 -D mapreduce.job.ubertask.enable = true /wordcount/input /wordcount/output/result3
。我们看到的确将输入数据划分为9份了其它信息如下:
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize = 19 -D mapreduce.job.ubertask.enable = true /wordcount/input /wordcount/output/result4
源码分析
protected void serviceStart() throws Exception {// 省略无关代码job = createJob(getConfig(), forcedState, shutDownMessage);// 省略无关代码if (!errorHappenedShutDown) {JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);jobEventDispatcher.handle(initJobEvent);// 省略无关代码if (job.isUber()) {speculatorEventDispatcher.disableSpeculation();} else {dispatcher.getEventHandler().handle(new SpeculatorEvent(job.getID(), clock.getTime()));}}
serviceStart方法的执行步骤如下:
- 调用createJob方法创建JobImpl实例。
- 发送JOB_INIT事件,然后处理此事件。
- 使用Uber运行模式的一个附加动作——即一旦满足Uber运行的四个条件,那么将不会进行推断执行优化。
protected Job createJob(Configuration conf, JobStateInternal forcedState, String diagnostic) {// create single jobJob newJob =new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),taskAttemptListener, jobTokenSecretManager, jobCredentials, clock,completedTasksFromPreviousRun, metrics,committer, newApiCommitter,currentUser.getUserName(), appSubmitTime, amInfos, context, forcedState, diagnostic);((RunningAppContext) context).jobs.put(newJob.getID(), newJob);dispatcher.register(JobFinishEvent.Type.class,createJobFinishEventHandler()); return newJob;}
从以上代码可以看到创建了一个JobImpl对象,此对象自身维护了一个状态机(有关状态机转换的实现原理可以参阅《Hadoop2.6.0中YARN底层状态机实现分析》一文的内容),用于在接收到事件之后进行状态转移并触发一些动作。JobImpl新建后的状态forcedState是JobStateInternal.NEW。最后将此JobImpl对象放入AppContext接口的实现类RunningAppContext的类型为Map<JobId,工作>的缓存上下文中。
private class JobEventDispatcher implements EventHandler<JobEvent> {@SuppressWarnings("unchecked")@Overridepublic void handle(JobEvent event) {((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);}}
处理方法从AppContext的实现类RunningAppContext中获取JobImpl对象,代码如下:
@Overridepublic Job getJob(JobId jobID) {return jobs.get(jobID);}
public void handle(JobEvent event) {if (LOG.isDebugEnabled()) {LOG.debug("Processing " + event.getJobId() + " of type "+ event.getType());}try {writeLock.lock();JobStateInternal oldState = getInternalState();try {getStateMachine().doTransition(event.getType(), event);} catch (InvalidStateTransitonException e) {LOG.error("Can't handle this event at current state", e);addDiagnostic("Invalid event " + event.getType() + " on Job " + this.jobId);eventHandler.handle(new JobEvent(this.jobId,JobEventType.INTERNAL_ERROR));}//notify the eventhandler of state changeif (oldState != getInternalState()) {LOG.info(jobId + "Job Transitioned from " + oldState + " to "+ getInternalState());rememberLastNonFinalState(oldState);}}finally {writeLock.unlock();}}
- 获取修改JobImpl实例的锁;
- 获取JobImpl实例目前所处的状态
- 状态机状态转换;
- 释放修改JobImpl实例的锁。
getInternalState方法用于获取JobImpl实例当前的状态,其实现如下:
@Privatepublic JobStateInternal getInternalState() {readLock.lock();try {if(forcedState != null) {return forcedState;}return getStateMachine().getCurrentState();} finally {readLock.unlock();}}
JobImpl状态机转移时,处理的JobEvent的类型是JobEventType.JOB_INIT,因此经过状态机转换最终会调用InitTransition的transition方法。有关状态机转换的实现原理可以参阅《Hadoop2.6.0中YARN底层状态机实现分析》一文的内容。
InitTransition的transition方法处理Uber运行模式的关键代码是
@Overridepublic JobStateInternal transition(JobImpl job, JobEvent event) {// 省略无关代码job.makeUberDecision(inputLength);// 省略无关代码}
private void makeUberDecision(long dataInputLength) {//FIXME: need new memory criterion for uber-decision (oops, too late here;// until AM-resizing supported,// must depend on job client to pass fat-slot needs)// these are no longer "system" settings, necessarily; user may overrideint sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,fs.getDefaultBlockSize(this.remoteJobSubmitDir)); // FIXME: this is wrong; get FS from// [File?]InputFormat and default block size// from thatlong sysMemSizeForUberSlot =conf.getInt(MRJobConfig.MR_AM_VMEM_MB,MRJobConfig.DEFAULT_MR_AM_VMEM_MB);long sysCPUSizeForUberSlot =conf.getInt(MRJobConfig.MR_AM_CPU_VCORES,MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);boolean uberEnabled =conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps);boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces);boolean smallInput = (dataInputLength <= sysMaxBytes);// ignoring overhead due to UberAM and statics as negligible here:long requiredMapMB = conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0);long requiredReduceMB = conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0);long requiredMB = Math.max(requiredMapMB, requiredReduceMB);int requiredMapCores = conf.getInt(MRJobConfig.MAP_CPU_VCORES, MRJobConfig.DEFAULT_MAP_CPU_VCORES);int requiredReduceCores = conf.getInt(MRJobConfig.REDUCE_CPU_VCORES, MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);int requiredCores = Math.max(requiredMapCores, requiredReduceCores); if (numReduceTasks == 0) {requiredMB = requiredMapMB;requiredCores = requiredMapCores;}boolean smallMemory =(requiredMB <= sysMemSizeForUberSlot)|| (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT);boolean smallCpu = requiredCores <= sysCPUSizeForUberSlot;boolean notChainJob = !isChainJob(conf);// User has overall veto power over uberization, or user can modify// limits (overriding system settings and potentially shooting// themselves in the head). Note that ChainMapper/Reducer are// fundamentally incompatible with MR-1220; they employ a blocking// queue between the maps/reduces and thus require parallel execution,// while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks// and thus requires sequential execution.isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks&& smallInput && smallMemory && smallCpu && notChainJob;if (isUber) {LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"+ numReduceTasks + "r tasks (" + dataInputLength+ " input bytes) will run sequentially on single node.");// make sure reduces are scheduled only after all map are completedconf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,1.0f);// uber-subtask attempts all get launched on same node; if one fails,// probably should retry elsewhere, i.e., move entire uber-AM: ergo,// limit attempts to 1 (or at most 2? probably not...)conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);// disable speculationconf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);} else {StringBuilder msg = new StringBuilder();msg.append("Not uberizing ").append(jobId).append(" because:");if (!uberEnabled)msg.append(" not enabled;");if (!smallNumMapTasks)msg.append(" too many maps;");if (!smallNumReduceTasks)msg.append(" too many reduces;");if (!smallInput)msg.append(" too much input;");if (!smallCpu)msg.append(" too much CPU;");if (!smallMemory)msg.append(" too much RAM;");if (!notChainJob)msg.append(" chainjob;");LOG.info(msg.toString());}}
- 设置当map任务全部运行结束后才开始reduce任务(参数mapreduce.job.reduce.slowstart.completedmaps设置为1.0)。
- 将当前Job的最大map任务尝试执行次数(参数mapreduce.map.maxattempts)和最大reduce任务尝试次数(参数mapreduce.reduce.maxattempts)都设置为1。
- 取消当前Job的map任务的推断执行(参数mapreduce.map.speculative设置为false)和reduce任务的推断执行(参数mapreduce.reduce.speculative设置为false)。
后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。
京东:http://item.jd.com/11846120.html
当当:http://product.dangdang.com/23838168.html
Hadoop2.6.0运行mapreduce之Uber模式验证相关推荐
- MapReduce作业Uber模式
大家在提交MapReduce作业的时候肯定看过如下的输出: 17/04/17 14:00:38 INFO mapreduce.Job: Running job: job_1472052053889_0 ...
- Hadoop2.6.0+Linux Centos7+idea环境下:MapReduce二度好友推荐案例
目录 一.问题描述 二.intellij idea中编写代码+打包项目 三.xftp中上传jar包到Linux 四.hadoop中准备输入数据+运行jar包+查看输出结果 一.问题描述 使用MapRe ...
- 使用Eclipse编译运行MapReduce程序 Hadoop2.6.0/Ubuntu
上篇介绍了使用命令行编译打包运行自己的MapReduce程序,使用 Eclipse 更加方便.要在 Eclipse 上编译和运行 MapReduce 程序,需要安装 hadoop-eclipse-pl ...
- cdh5.9运行mapreduce uber任务报java.lang.RuntimeException: native snappy library not available错误
1.问题描述 在对yarn进行了uber任务配置之后,运行了一个word count的mapreduce任务.控制台显示的已经在uber model下了: 但是运行完了之后发现任务失败.接着查看了运行 ...
- eclipse中hadoop2.3.0环境部署及在eclipse中直接提交mapreduce任务
转自:http://my.oschina.net/mkh/blog/340112 1 eclipse中hadoop环境部署概览 eclipse中部署hadoop包括两大部分:hdfs环境部署和mapr ...
- Hadoop2.2.0 HA高可用分布式集群搭建(hbase,hive,sqoop,spark)
1 需要软件 Hadoop-2.2.0 Hbase-0.96.2(这里就用这个版本,跟Hadoop-2.2.0是配套的,不用覆盖jar包什么的) Hive-0.13.1 Zookeepr-3.4.6( ...
- ubuntu14.04安装hadoop2.6.0(伪分布模式)
版本:虚拟机下安装的ubuntu14.04(64位),hadoop-2.6.0 下面是hadoop2.6.0的官方英文教程: http://hadoop.apache.org/docs/r2.6.0/ ...
- Hadoop2.2.0集群在RHEL6.2下的安装实战
题记 本文介绍了一个Hadoop2.2.0集群的搭建过程,在2台4G内存的酷睿双核PC机上,使用VMWare WorkStation虚拟了4个RHEL6.2(1G内存.单核CPU.10G硬盘),总计用 ...
- hadoop-2.4.0完全分布式集群搭建
2019独角兽企业重金招聘Python工程师标准>>> 1.配置hosts 各linux版本hosts文件位置可能不同,redhat是在 /etc/hosts,编辑之: 172.17 ...
最新文章
- Python3学习笔记-数据类型和变量
- EOS账户系统(7)权限评估
- aserisk笔记(dahdi工具相关)
- java开发手册:线程池不允许使用 Executors 去创建
- VTK:图片之ImageAccumulate
- 精益质量管理中执行防错管理的八大要点
- 全球顶级大学,在中国录取率却不足0.5%,答案一针见血!
- python 列表写入csv_Python将字典数据写入CSV文件
- Kerberos:cannot get master principle
- 一步一步实现KNN分类算法
- 系统逻辑架构图_如何画好一张架构图?(内含知识图谱)
- 思科路由器常用配置命令大全
- Altium Designer20 PCB封装库制作
- 最优化算法(1):数学基础
- 重新定义软件定义安全
- median frequency balancing
- Kinect黑客:机械人科技未来的转变者
- WeaveSocket框架-Unity太空大战游戏-服务端-1
- AIX虚拟内存管理机制(转)
- 达摩院提出时序预测新模型 有效提升预测精准度