转自:http://www.cnblogs.com/ggjucheng/archive/2012/04/23/2465820.html

一、从Map到Reduce

MapReduce其实是分治算法的一种实现,其处理过程亦和用管道命令来处理十分相似,一些简单的文本字符的处理甚至也可以使用Unix的管道命令来替代,从处理流程的角度来看大概如下:

cat input | grep |   sort  |  uniq -c | cat > output
# Input -> Map -> Shuffle & Sort -> Reduce -> Output

简单的流程图如下:

对于Shuffle,简单地说就是将Map的输出通过一定的算法划分到合适的Reducer中进行处理。Sort当然就是对中间的结果进行按key排序,因为Reducer的输入是严格要求按key排序的。

Input->Map->Shuffle&Sort->Reduce->Output只是从宏观的角度对MapReduce的简单描述,实际在MapReduce的框架中,即从编程的角度来看,其处理流程是Input->Map->Sort->Combine->Partition->Reduce->Output。用之前的对温度进行统计的例子来讲述这些过程。

Input Phase

输入的数据需要以一定的格式传递给Mapper的,格式有多种,如TextInputFormat、DBInputFormat、SequenceFileInput等等,可以使用JobConf.setInputFormat来设置,这个过程还应该包括对输入的数据进行任务粒度划分(split)然后再传递给Mapper。在温度的例子中,由于处理的都是文本数据,输入的格式使用默认的TextInputFormat即可。

Map Phase

对输入的key、value对进行处理,输出的是key、value的集合,即map (k1, v1) -> list(k2, v2),使用JobConf.setMapperClass设置自己的Mapper。在例子中,将(行号、温度的文本数据)作为key/value输入,经过处理后,从温度的文件数据中提取出日期中的年份和该日的温度数据,形成新的key/value对,最后以list(年,  温度)的结果输出,如[(1950, 10), (1960, 40), (1960, 5)]。

Sort Phase

对Mapper输出的数据进行排序,可以通过JobConf.setOutputKeyComparatorClass来设置自己的排序规则。在例子中,经过排序之后,输出的list集合是按年份进行排序的list(年, 温度),如[(1950, 10), (1950, 5), (1960, 40)]。

Combine Phase

这个阶段是将中间结果中有相同的key的<key, value>对合并成一对,Combine的过程与Reduce很相似,使用的甚至是Reduce的接口。通过Combine能够减少<key, value>的集合数量,从而减少网络流量。Combine只是一个可选的优化过程,并且无论Combine执行多少次(>=0),都会使Reducer产生相同的输出,使用JobConf.setCombinerClass来设置自定义的Combine Class。在例子中,假如map1产生出的结果为[(1950, 0), (1950, 20), (1950, 10)],在map2产生出的结果为[(1950, 15), (1950, 25)],这两组数据作为Reducer的输入并经过Reducer处理后的年最高温度结果为(1950, 25),然而当在Mapper之后加了Combine(Combine先过滤出最高温度),则map1的输出是[(1950, 20)]和map2的输出是[(1950, 25)],虽然其他的三组数据被抛弃了,但是对于Reducer的输出而言,处理后的年最高温度依然是(1950, 25)。

Partition Phase

把Mapper任务输出的中间结果按key的范围划分成R份(R是预先定义的Reduce任务的个数),默认的划分算法是”(key.hashCode() & Integer.MAX_VALUE) % numPartitions”,这样保证了某一范围的key一定是由某个Reducer来处理,简化了Reducer的处理流程,使用JobConf.setPartitionClass来设置自定义的Partition Class。在例子中,默认就自然是对年份进行取模了。

Reduce Phase

Reducer获取Mapper输出的中间结果,作为输入对某一key范围区间进行处理,使用JobConf.setReducerClass来设置。在例子中,与Combine Phase中的处理是一样的,把各个Mapper传递过来的数据计算年最高温度。

Output Phase

Reducer的输出格式和Mapper的输入格式是相对应的,当然Reducer的输出还可以作为另一个Mapper的输入继续进行处理。

二、Details of Job Run

上面只是从task运行中描述了Map和Reduce的过程,实际上当从运行”hadoop jar”开始还涉及到很多其他的细节。从整个Job运行的流程来看,如下图所示:

从上图可以看到,MapReduce运行过程中涉及有4个独立的实体:

  • Client,用于提交MapReduce job。
  • JobTracker,负责协调job的运行。
  • TaskTrackers,运行 job分解后的多个tasks,task主要是负责运行Mapper和Reducer。
  • Distributed filesystem,用于存储上述实体运行时共享的job文件(如中间结果文件)。

Job Submission

当调用了JobClient.runJob()之后,Job便开始被提交了,在Job提交这个步骤中,经历了以下的过程:

  1. Client向JobTacker申请一个新的job ID(step 2),job ID形如job_200904110811_0002的格式,是由JobTracker运行当前的job的时间和一个由JobTracker维护的自增计数(从1开始)组成的。
  2. 检查job的output specification,比如输出目录是否已经存在(存在则抛异常)、是否有权限写等等。
  3. Computes the input splits for the job,这些input splits就是作为Mapper的输入。
  4. Copies the resources needed to run the job, including the job JAR file, the configuration file and the computed input splits, to the jobtracker’s filesystem in a direcotry named after the job ID(step 3)。
  5. Tells the jobtracker that the job is ready for execution(step 4)。

Job Initialization

当JobTracker收到Job提交的请求后,将job保存在一个内部队列,并让Job Scheduler处理并初始化。初始化涉及到创建一个封装了其tasks的job对象,并保持对task的状态和进度的根据(step 5)。当创建要运行的一系列task对象后,Job Scheduler首先开始从文件系统中获取由JobClient计算的input splits(step 6),然后再为每个split创建map task。

Task Assignment

TaskTrackers会使用一个简单的loop为定期向JobTracker发送heartbeat调用,发送的间隔时间大约5秒,一般取决于集群服务器的规模和繁忙程度以及网络拥挤程度。这个heartbeat一方面是告知JobTracker当前TaskTracker处于live状态,同时是用于JobTracker和TaskTracker进行通信,TaskTracker会根据heartbeat的返回值来执行一定的操作(step 7)。

To choose a reduce task the JobTracker simply takes the next in its list of yet-to-be-run reduce tasks, since there are no data locality considerations. For a map task, however, it takes account of the TaskTracker’s network location and picks a task whose input splits is as close as possible to the tasktracker. In the optimal case, the task is data-local, that is , running on the same node that the split resides on. Alternatively, the task may be rack-local: on the same rack, but not the same node, as the split.

Task Execution

当TaskTrack被分配到一个task之后,接下来就是运行这个task。首先,它会需要的job JAR文件从shared filesystem拷贝到local filesystem,然后创建一个working direcotry并un-jars拷贝的JAR文件到该directory,最后就创建一个TaskRunner对象运行task。

TaskRunner在运行的时候是启动了一个新的JVM来run each task(step 10),这样是为了防止在用户自定义的Mapper出现异常令JVM挂了,从而连累到TaskTracker。TaskRunner子进程会使用umbilical接口和TaskTracker通信并每隔几秒向TaskTracker汇报进度。

对于使用Streaming和Pipes方式来创建的Mapper,也是作为TaskTracker的子进程来运行的。Streaming是使用标准输入输出来通信,而Pipes是使用socket来进行通信,如下图:

Progress and Status Updates

进度和状态是通过heartbeat来更新和维护的。来对于Map Task,进度就是已处理数据和所有输入数据的比例。对于Reduce Task,情况就有点复杂,包括3部分,拷贝中间结果文件、排序、Reduce调用,每部分占1/3。

Job Completion

当Job完成后,JobTracker会收一个Job Complete的通知,并将当前的Job状态更新为Successful,同时JobClient也会轮循获知提交的Job已经完成,将信息显示给用户。最后,JobTracker会清理和回收该Job的相关资源,并通知TaskTracker进行相同的操作(比如删除中间结果文件)。

How MapReduce Works相关推荐

  1. MapReduce英语面试

    1--What's Mapreduce.(How does Mapreduce works?) Mapreduce is a progarmming model to process data pro ...

  2. Hadoop的学习笔记(Hive|pig|zookeeper|hbase)

    轉載的,此筆記的鏈接地址請點擊此處 hadoop笔记本 <div class="postText"><div id="cnblogs_post_body ...

  3. 读书笔记《Hadoop权威指南第4版(Hadoop The Definitive Guide 4th)》

    Chapter 1 Meet Hadoop Data Storage and Analysis The problem is simple: although the storage capaciti ...

  4. hadoop调用python算法_使用Python实现Hadoop MapReduce程序

    根据上面两篇文章,下面是我在自己的ubuntu上的运行过程.文字基本采用博文使用Python实现Hadoop MapReduce程序,  打字很浪费时间滴. 在这个实例中,我将会向大家介绍如何使用Py ...

  5. MapReduce with MongoDB and Python[ZT]

    MapReduce with MongoDB and Python 从 Artificial Intelligence in Motion 作者:Marcel Pinheiro Caraciolo ( ...

  6. 使用Python实现Hadoop MapReduce程序

    根据上面两篇文章,下面是我在自己的ubuntu上的运行过程.文字基本采用博文使用Python实现Hadoop MapReduce程序,  打字很浪费时间滴. 在这个实例中,我将会向大家介绍如何使用Py ...

  7. 【Python学习系列四】Python程序通过hadoop-streaming提交到Hadoop集群执行MapReduce

    场景:将Python程序通过hadoop-streaming提交到Hadoop集群执行. 参考:http://www.michael-noll.com/tutorials/writing-an-had ...

  8. PigPen:用Clojure写MapReduce Introducing PigPen: Map-Reduce for Clojure

    It is our pleasure to release PigPen to the world today. PigPen is map-reduce for Clojure. It compil ...

  9. MapReduce V1:Job提交流程之JobTracker端分析

    我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程.MapReduce V1实现中,主要存在3个主要的分布式进程(角色):JobClient.JobTracker和TaskTr ...

最新文章

  1. BZOJ 2257: [Jsoi2009]瓶子和燃料
  2. winfrom弹出窗口用timer控件控制倒计时20秒后关闭
  3. PAT_B_1002_Java(20分)
  4. 【hash】Seek the Name, Seek the Fame
  5. 跟我一起屏蔽百度搜索页面右侧的内容
  6. MySQL学习笔记_5_SQL语言的设计与编写(上)
  7. Linux CentOS 编译LUA。。搞半天终于对了= =
  8. Java 算法 阿尔法乘积
  9. python后台架构Django教程——项目配置setting
  10. gis 空间分析 鸟类栖息地选取_鸟类的栖息地选择
  11. 用vector实现通用堆栈的类模板
  12. linux-什么是Linux系统?linux详解Linux与Windows的区别Linux发行版本及特点介绍
  13. 快手进军元宇宙:数字人主播拉动“三驾马车”
  14. 10个国外设计网站(自学设计的童鞋建议收藏)
  15. android UI设计图片和文字尺寸px对应dp、sp值换算
  16. 解决生产计划排程APS系统七大问题,提升企业生产效率!
  17. stm32 mp3软件音频解码案例分析流程(一)
  18. Linux核心命令汇总(思维导图+实例讲解)
  19. 基于AutoJs的94自动运行脚本
  20. 自适应流媒体传输-fmp4

热门文章

  1. python正则匹配ip地址_python 正则表达式匹配IP地址
  2. matplotlib 中文_看了这个总结,其实 Matplotlib 可视化,也没那么难
  3. odbc mysql导出access_将mysql数据导入access数据库
  4. 中国计算机游戏竞赛,信任与欺骗的游戏:全球计算机游戏程序竞赛,诠释了《老子通释》...
  5. lisp封装成vla函数_[良心教程]分享最新最实用的按键精灵封装函数
  6. 怎么在服务器添加充值网站,云服务器怎么弄充值
  7. mysql or 短路_MySQL是否使IF()函数短路?
  8. php+mysql开发实战 pdf_《PHP + MySQL 开发实战》怎么样_目录_pdf在线阅读 - 课课家教育...
  9. ubuntu18 mysql5.6源码_Ubuntu 18.04 使用Systemd管理MySQL 5.6
  10. easyui 日期控件增加清空按钮