How MapReduce Works
转自:http://www.cnblogs.com/ggjucheng/archive/2012/04/23/2465820.html
一、从Map到Reduce
MapReduce其实是分治算法的一种实现,其处理过程亦和用管道命令来处理十分相似,一些简单的文本字符的处理甚至也可以使用Unix的管道命令来替代,从处理流程的角度来看大概如下:
cat input | grep | sort | uniq -c | cat > output # Input -> Map -> Shuffle & Sort -> Reduce -> Output
简单的流程图如下:
对于Shuffle,简单地说就是将Map的输出通过一定的算法划分到合适的Reducer中进行处理。Sort当然就是对中间的结果进行按key排序,因为Reducer的输入是严格要求按key排序的。
Input->Map->Shuffle&Sort->Reduce->Output只是从宏观的角度对MapReduce的简单描述,实际在MapReduce的框架中,即从编程的角度来看,其处理流程是Input->Map->Sort->Combine->Partition->Reduce->Output。用之前的对温度进行统计的例子来讲述这些过程。
Input Phase
输入的数据需要以一定的格式传递给Mapper的,格式有多种,如TextInputFormat、DBInputFormat、SequenceFileInput等等,可以使用JobConf.setInputFormat来设置,这个过程还应该包括对输入的数据进行任务粒度划分(split)然后再传递给Mapper。在温度的例子中,由于处理的都是文本数据,输入的格式使用默认的TextInputFormat即可。
Map Phase
对输入的key、value对进行处理,输出的是key、value的集合,即map (k1, v1) -> list(k2, v2),使用JobConf.setMapperClass设置自己的Mapper。在例子中,将(行号、温度的文本数据)作为key/value输入,经过处理后,从温度的文件数据中提取出日期中的年份和该日的温度数据,形成新的key/value对,最后以list(年, 温度)的结果输出,如[(1950, 10), (1960, 40), (1960, 5)]。
Sort Phase
对Mapper输出的数据进行排序,可以通过JobConf.setOutputKeyComparatorClass来设置自己的排序规则。在例子中,经过排序之后,输出的list集合是按年份进行排序的list(年, 温度),如[(1950, 10), (1950, 5), (1960, 40)]。
Combine Phase
这个阶段是将中间结果中有相同的key的<key, value>对合并成一对,Combine的过程与Reduce很相似,使用的甚至是Reduce的接口。通过Combine能够减少<key, value>的集合数量,从而减少网络流量。Combine只是一个可选的优化过程,并且无论Combine执行多少次(>=0),都会使Reducer产生相同的输出,使用JobConf.setCombinerClass来设置自定义的Combine Class。在例子中,假如map1产生出的结果为[(1950, 0), (1950, 20), (1950, 10)],在map2产生出的结果为[(1950, 15), (1950, 25)],这两组数据作为Reducer的输入并经过Reducer处理后的年最高温度结果为(1950, 25),然而当在Mapper之后加了Combine(Combine先过滤出最高温度),则map1的输出是[(1950, 20)]和map2的输出是[(1950, 25)],虽然其他的三组数据被抛弃了,但是对于Reducer的输出而言,处理后的年最高温度依然是(1950, 25)。
Partition Phase
把Mapper任务输出的中间结果按key的范围划分成R份(R是预先定义的Reduce任务的个数),默认的划分算法是”(key.hashCode() & Integer.MAX_VALUE) % numPartitions”,这样保证了某一范围的key一定是由某个Reducer来处理,简化了Reducer的处理流程,使用JobConf.setPartitionClass来设置自定义的Partition Class。在例子中,默认就自然是对年份进行取模了。
Reduce Phase
Reducer获取Mapper输出的中间结果,作为输入对某一key范围区间进行处理,使用JobConf.setReducerClass来设置。在例子中,与Combine Phase中的处理是一样的,把各个Mapper传递过来的数据计算年最高温度。
Output Phase
Reducer的输出格式和Mapper的输入格式是相对应的,当然Reducer的输出还可以作为另一个Mapper的输入继续进行处理。
二、Details of Job Run
上面只是从task运行中描述了Map和Reduce的过程,实际上当从运行”hadoop jar”开始还涉及到很多其他的细节。从整个Job运行的流程来看,如下图所示:
从上图可以看到,MapReduce运行过程中涉及有4个独立的实体:
- Client,用于提交MapReduce job。
- JobTracker,负责协调job的运行。
- TaskTrackers,运行 job分解后的多个tasks,task主要是负责运行Mapper和Reducer。
- Distributed filesystem,用于存储上述实体运行时共享的job文件(如中间结果文件)。
Job Submission
当调用了JobClient.runJob()之后,Job便开始被提交了,在Job提交这个步骤中,经历了以下的过程:
- Client向JobTacker申请一个新的job ID(step 2),job ID形如job_200904110811_0002的格式,是由JobTracker运行当前的job的时间和一个由JobTracker维护的自增计数(从1开始)组成的。
- 检查job的output specification,比如输出目录是否已经存在(存在则抛异常)、是否有权限写等等。
- Computes the input splits for the job,这些input splits就是作为Mapper的输入。
- Copies the resources needed to run the job, including the job JAR file, the configuration file and the computed input splits, to the jobtracker’s filesystem in a direcotry named after the job ID(step 3)。
- Tells the jobtracker that the job is ready for execution(step 4)。
Job Initialization
当JobTracker收到Job提交的请求后,将job保存在一个内部队列,并让Job Scheduler处理并初始化。初始化涉及到创建一个封装了其tasks的job对象,并保持对task的状态和进度的根据(step 5)。当创建要运行的一系列task对象后,Job Scheduler首先开始从文件系统中获取由JobClient计算的input splits(step 6),然后再为每个split创建map task。
Task Assignment
TaskTrackers会使用一个简单的loop为定期向JobTracker发送heartbeat调用,发送的间隔时间大约5秒,一般取决于集群服务器的规模和繁忙程度以及网络拥挤程度。这个heartbeat一方面是告知JobTracker当前TaskTracker处于live状态,同时是用于JobTracker和TaskTracker进行通信,TaskTracker会根据heartbeat的返回值来执行一定的操作(step 7)。
To choose a reduce task the JobTracker simply takes the next in its list of yet-to-be-run reduce tasks, since there are no data locality considerations. For a map task, however, it takes account of the TaskTracker’s network location and picks a task whose input splits is as close as possible to the tasktracker. In the optimal case, the task is data-local, that is , running on the same node that the split resides on. Alternatively, the task may be rack-local: on the same rack, but not the same node, as the split.
Task Execution
当TaskTrack被分配到一个task之后,接下来就是运行这个task。首先,它会需要的job JAR文件从shared filesystem拷贝到local filesystem,然后创建一个working direcotry并un-jars拷贝的JAR文件到该directory,最后就创建一个TaskRunner对象运行task。
TaskRunner在运行的时候是启动了一个新的JVM来run each task(step 10),这样是为了防止在用户自定义的Mapper出现异常令JVM挂了,从而连累到TaskTracker。TaskRunner子进程会使用umbilical接口和TaskTracker通信并每隔几秒向TaskTracker汇报进度。
对于使用Streaming和Pipes方式来创建的Mapper,也是作为TaskTracker的子进程来运行的。Streaming是使用标准输入输出来通信,而Pipes是使用socket来进行通信,如下图:
Progress and Status Updates
进度和状态是通过heartbeat来更新和维护的。来对于Map Task,进度就是已处理数据和所有输入数据的比例。对于Reduce Task,情况就有点复杂,包括3部分,拷贝中间结果文件、排序、Reduce调用,每部分占1/3。
Job Completion
当Job完成后,JobTracker会收一个Job Complete的通知,并将当前的Job状态更新为Successful,同时JobClient也会轮循获知提交的Job已经完成,将信息显示给用户。最后,JobTracker会清理和回收该Job的相关资源,并通知TaskTracker进行相同的操作(比如删除中间结果文件)。
How MapReduce Works相关推荐
- MapReduce英语面试
1--What's Mapreduce.(How does Mapreduce works?) Mapreduce is a progarmming model to process data pro ...
- Hadoop的学习笔记(Hive|pig|zookeeper|hbase)
轉載的,此筆記的鏈接地址請點擊此處 hadoop笔记本 <div class="postText"><div id="cnblogs_post_body ...
- 读书笔记《Hadoop权威指南第4版(Hadoop The Definitive Guide 4th)》
Chapter 1 Meet Hadoop Data Storage and Analysis The problem is simple: although the storage capaciti ...
- hadoop调用python算法_使用Python实现Hadoop MapReduce程序
根据上面两篇文章,下面是我在自己的ubuntu上的运行过程.文字基本采用博文使用Python实现Hadoop MapReduce程序, 打字很浪费时间滴. 在这个实例中,我将会向大家介绍如何使用Py ...
- MapReduce with MongoDB and Python[ZT]
MapReduce with MongoDB and Python 从 Artificial Intelligence in Motion 作者:Marcel Pinheiro Caraciolo ( ...
- 使用Python实现Hadoop MapReduce程序
根据上面两篇文章,下面是我在自己的ubuntu上的运行过程.文字基本采用博文使用Python实现Hadoop MapReduce程序, 打字很浪费时间滴. 在这个实例中,我将会向大家介绍如何使用Py ...
- 【Python学习系列四】Python程序通过hadoop-streaming提交到Hadoop集群执行MapReduce
场景:将Python程序通过hadoop-streaming提交到Hadoop集群执行. 参考:http://www.michael-noll.com/tutorials/writing-an-had ...
- PigPen:用Clojure写MapReduce Introducing PigPen: Map-Reduce for Clojure
It is our pleasure to release PigPen to the world today. PigPen is map-reduce for Clojure. It compil ...
- MapReduce V1:Job提交流程之JobTracker端分析
我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程.MapReduce V1实现中,主要存在3个主要的分布式进程(角色):JobClient.JobTracker和TaskTr ...
最新文章
- BZOJ 2257: [Jsoi2009]瓶子和燃料
- winfrom弹出窗口用timer控件控制倒计时20秒后关闭
- PAT_B_1002_Java(20分)
- 【hash】Seek the Name, Seek the Fame
- 跟我一起屏蔽百度搜索页面右侧的内容
- MySQL学习笔记_5_SQL语言的设计与编写(上)
- Linux CentOS 编译LUA。。搞半天终于对了= =
- Java 算法 阿尔法乘积
- python后台架构Django教程——项目配置setting
- gis 空间分析 鸟类栖息地选取_鸟类的栖息地选择
- 用vector实现通用堆栈的类模板
- linux-什么是Linux系统?linux详解Linux与Windows的区别Linux发行版本及特点介绍
- 快手进军元宇宙:数字人主播拉动“三驾马车”
- 10个国外设计网站(自学设计的童鞋建议收藏)
- android UI设计图片和文字尺寸px对应dp、sp值换算
- 解决生产计划排程APS系统七大问题,提升企业生产效率!
- stm32 mp3软件音频解码案例分析流程(一)
- Linux核心命令汇总(思维导图+实例讲解)
- 基于AutoJs的94自动运行脚本
- 自适应流媒体传输-fmp4
热门文章
- python正则匹配ip地址_python 正则表达式匹配IP地址
- matplotlib 中文_看了这个总结,其实 Matplotlib 可视化,也没那么难
- odbc mysql导出access_将mysql数据导入access数据库
- 中国计算机游戏竞赛,信任与欺骗的游戏:全球计算机游戏程序竞赛,诠释了《老子通释》...
- lisp封装成vla函数_[良心教程]分享最新最实用的按键精灵封装函数
- 怎么在服务器添加充值网站,云服务器怎么弄充值
- mysql or 短路_MySQL是否使IF()函数短路?
- php+mysql开发实战 pdf_《PHP + MySQL 开发实战》怎么样_目录_pdf在线阅读 - 课课家教育...
- ubuntu18 mysql5.6源码_Ubuntu 18.04 使用Systemd管理MySQL 5.6
- easyui 日期控件增加清空按钮