1. Hadoop MapReduce简介

Hadoop MapReduce是一个使用简便的软件框架,是Google云计算模型MapReduce的Java开源实现,基于它写出来的应用程序能够运行在由上千万台普通机器注册的大型集群系统中,并以一种可靠地、容错的方式并行处理上T级别的数据集。

Hadoop MapReduce基本思想:一个MapReduce作业通常会把输入的数据集合切分为若干独立的数据块,由Map任务并行的方式处理。该框架会对Map的输出先进行排序,然后把结果输出作为Reduce任务的输入。通常作用的输入和输出都会存储在文件系统中。

1.1 系统架构

在系统架构上,MapReduce框架是一种主从架构,由一个单独的JobTracker节点和多个TaskTracker节点共同组成。

  1. JobTracker是MapReduce的Master,负责调度构成一个作业的所有任务,这些任务分布在不同 的TaskTracker节点上,监控它们的执行,重新执行已经失败的任务,同时提高状态和诊断信息给作业客户端。

  2. TaskTracker是MapReduce的Slave,仅负责运行由Master指派的任务执行。

Hadoop的作业客户端提交作业(jar包和可执行程序)和配置信息作为Master的JobTracker,JobTracker负责分发用户程序和配置信息给集群中的TaskTracker,以及调度任务并监控他们的执行,同时提供状态和诊断信息给作业客户端。

2. MapReduce模型

2.1 MapReduce编程模型

对于Map函数,处理输入的键值对,并且产生一组中间的键值对。MapReduce框架收集所有相同的中间键值的键值对,并且发送给Reduce函数进行处理。对于Reduce函数,它处理中间键的键值对,以及这个中间键值对相关的值集合。此函数合并这些值,最后形成一个相对较少的值集合。

2.2 MapReduce实现原理

  1. 用户程序中的MapReduce函数库首先把输入文件分成M块(Hadoop默认128M),接着在集群机器中执行处理程序。
  2. 主控程序master分配Map任务和Reduce任务给工作执行机器Worker。总该有M个任务和R个reduce任务需要分配。master会选择空闲的Worker并且分配这些Map任务或者Reduce任务给Worker节点。
  3. 一个分配了Map任务的worker读取并处理相关输入的数据块。从输入的数据片段中解析key/value键值对,然后把key/value键值对传递给用户自定义的map函数,map函数生成并输出key/vaule键值对集合,这些集合会暂时缓存内存中。
  4. 缓存中的key/value键值对通过分区函数分成R个区域,之后周期性地写入本地磁盘上。同时缓存key/value键值对集合在本地磁盘上的存储位置发送给master节点,由master再把这些记录传送给Reduce worker。
  5. 当Reduce worker程序接收到master程序发送过来的数据存储位置信息之后,使用RPC从Map worker所在的主机磁盘上读取这些缓存数据。在Reduce Worker读取了所有的中间数据之后,通过key进行排序后使得具有相同key值的数据聚合在一起。由于许多不同的key值会映射到相同的Reduce任务上,因此必须进行排序。
  6. Reduce Worker程序遍历排序后的中间数据。对于每一个唯一的中间key值,Reduce Worker程序都会将这个key值和它相关的中间vaule值的集合传递给用户自定义的Reduce函数。Reduce函数的输出被追加到所属分区的输出文件中。

3. 计算流程与机制

3.1 Hadoop作业提交和初始化

MapReduce的Master接收到客户端所提交的作业后首先要完成的就是将作业初始化为map任务和reduce任务,然后就是等待JobTracker调度执行。

  1. 命令行提交。用户使用Hadoop命令行脚本提交Mapreduce程序到集群中。
  2. 作业上次。在提交作业到JobTracker之前还需要完成相关的初始化工作。这些工作包括用户作业的JobId,创建HDFS目录,上传作业、相关依赖库、需要分发的文件到HDFS中,同时还包括用户输入数据的所有分片信息。
  3. 产生分片文件。在作业提交之后,JobClient调用InputFormat的getSplit()方法产生用户数据的split分片信息,这些信息包括Input元数据信息,原始切分数据信息,其中元数据信息会被JobTracker使用,原始切分信息会在Map任务初始化的时候获取自己需要处理的数据信息。这两部分数据被保存在job.split文件和job.splitmetainfo文件中
  4. 提交作业到JobTracker。JobClient通过远程调用RPC将作业提交到JobTracker作业调度器中,首先为作业创建JobInprogress对象。JobTracker会为用户提交的每一个作业创建JobInprogress对象,这个对象维护了作业的运行时的信息,主要用于跟踪正在运行的任务的状态和进度。其次,检查用户是否具有指定队列的作业提交权限。

通过上面的操作就完成了MapReduce作业的提交工作了,那么接下来就开始作业的初始化操作:作业的初始化操作主要指的就是构造MapTask和ReduceTask并且对他们进行初始化操作,这一步操作主要是调度器JobTracker.initJob()方法来进行的。具体情况是Hadoop将每个作业分成4个不同类型的任务:Setup Task、Map Task、Reduce Task、Cleanup Task。

3.2 Mapper

Mapper是MapReduce框架给用户暴露的Map编程接口,用户在实现自己的Mapper类时需要继承这个基类。执行Map Task任务:将输入键值对(key/value pair)映射到一组中间格式的键值对集合。

处理流程如下:

  1. 通过InputFormat接口获得InputSplit的实现,然后对输入的数据切分。每一个Split分块对应一个Mapper任务。
  2. 通过RecordReader对象读取生成<k,v>键值对。Map函数接受数据并处理后输出<k1,v1>键值对。
  3. 通过context.collect方法写入context对象中。当键值对集中被收集后,会被Partition类中的partition()函数以指定方式区分并写入输出缓冲区(系统默认的是HashPartitioner),同时调用sort()进行排序。
  4. 如果用户指定了Combiner,则会将键值对进行combine合并(相当于map端的reduce),输出到reduce写入文件。

3.3 Reducer

Reducer将与一个key关联的一组中间数值集归约为一个更小的数值集。

  1. Shuffle阶段。框架通过HTTP协议为每个Reducer获得所有Mapper输出中与之相关的分块,这一阶段也称混洗阶段,所做的大量操作就是数据复制,因此也可以称为数据复制阶段。
  2. Sort阶段。 框架按照key的值对Reducer的输入进行分组(因为不同的Mapper输出可能会有相同的key)。 Shuffle和Sort是同时进行的,Map的输出也是一边被取回一边被合并的。 如果需要改变分组方式,则需要指定一个Compartor,实现二次排序(后面会介绍)。
  3. Reduce阶段。 调用Reduce()函数,对Shuffle和sort得到的<key,(list of values)>进行处理,输出结果到DFS中。

3.4 Reporter和OutputCollector

Report是用于MapReduce应用程序的报告进度,设定应用级别的状态信息,更新Counters(计数器)的机制。Master和Reducer的处理情况可以利用Reporter来报告进度或者表明自己是运行正常的

OutputCollector是一个由Map/Reduce框架提供的,用于收集Mapper或者Reducer输出数据的通用机制。

4. MapReduce的输入/输出格式

MapReduce计算框架本质上是一种基于磁盘的批处理并行计算系统,每一轮MapReduce作业都需要从分布式文件系统中读取数据,处理之后再写入分布式文件系统。其他涉及到很多I/O操作,这些操作包括内存到磁盘、磁盘到内存,以及节点之间的数据交换。

4.1 输入格式

  1. 检查作业输入的有效性。
  2. 把输入文件切分成多个逻辑InputSplit实例,并把每个实例分发给一个Mapper(一对一);FileSplit是默认的InputSplit,通过write(DataOutput out)和readFields(DataInput in)两种方法进行序列化和反序列化。
  3. 提供RecordReader实现。这个RecordReader从逻辑InputSplit中获取输入记录,这些记录将由Mapper处理,Mapper利用该实现从InputSplit中读取<k,v>键值对。

TextInputFormat

TextInputFormat 用于读取纯文本文件。

KeyValueTextInputFormat

KeyValueTextInputFormat同样是用于读取文本文件。

NLineInputForamt

NLineInputForamt可以将文件以行为单位进行split切分,比如文件中的每一行对应的一个Map。

SequenceFileInputFormat

SequenceFileInputFormat用于读取SequenceFile。

4.2 输出格式

Hadoop中的OutputFormat用来描述MapReduce作业的输出格式:

  1. 检验作业的输出。
  2. 验证输出结果类型是否如在Config中所配置的。默认的OutputFormat是TextOutputFormat
  3. 提供一个RecordWriter的实现,用来输出作业结果。RecordWriter生成的<key,value>键值对输出到文件。

TextOutputFormat

TextOutputFormat是Hadoop默认的输出格式。

SequenceFileOutputFormat

SequenceFileOutputFormat用于就是输出到Hadoop中的SequenceFile文件格式。

MapFileOutputFormat

指定MapFileOutputFormat输出类型可以将数据输出为Hadoop中的MapFile文件格式。

MultipleOutputFormat

MultipleOutputFormat是Hadoop中的多路输出处理类,通过这个类可以实现根据key将记录控制输出到不同的文件。

5. 核心问题

5.1 Map和Reduce数量问题

MapTask数量

Max.split(100M)
Min.split(10M)
Block(64M)InputSize=Max(min.split,min(max.split,block))
  • Max.split指的是最大InputSplit文件大小
  • Min.split指的是最小InputSplit文件大小
  • Block指的是Block文件大小

其中InputSize的大小是InputSize=Max(min.split,min(max.split,block))

ReduceTask数量

job.setNumReduceTasks(numReduceTask);

通过job设置ReduceTask数量个数。

单个Reduce:

多个Reduce

数量为0(适应于不需要归约和处理的作业)

5.2 作业配置

  • 作业配置的相关设置方法
作业配置方法 功能说明
setNumReduceTasks 设置reduce数目
setNumMapTasks 设置Map数目
setInputFormatClass 设置输入文件格式类
setOutputFormatClass 设置输出文件格式类
setMapperClass 输出Map类
setCombiner 设置Combiner类
setReducerClass 设置Reduce类
setPartitionerClass 设置Partitioner类
setMapOutputKeyClass 设置Map输出的Key类
setMapOutputValueClass 设置Map输出的Value类
setCompressMapOutput 设置Map输出是否压缩
setOutputValueClass 设置输出value类
setJobName 设置作业名字
setSpeculativeExecution 设置是否开启预防性执行
setMapSpeculativeExecution 设置是否开启Map任务的预防性执行
setReduceSpeculativeExecution 设置是否开启Reduce任务的预防性执行

5.3 作业的容错机制

MapReduce作为一个通用的并行计算框架,有着非常健壮的容错机制,在不同的粒度上均有考虑。

再执行

用户的一个MapReduce作业往往是由很多任务组成的,只有所有的任务执行处理完毕之后才算整个作业成功。对于任务的容错机制,MapReduce采用的最简单的方法进行处理,即“再执行”,也就是对于失败的任务重新调度执行一次。一般有以下两种情况需要再执行:

  1. 如果是Map任务或者Reduce任务失败了,那么调度器就会将这个失败的任务分配到其他节点重新执行。
  2. 如果是一个节点死掉了,那么在这台死机的节点上已经完成了运行的map
    任务以及正在运行中的Map和Reduce任务都将调度重新执行,同时在其他机器上正在运行的Reduce任务也将被重新执行。

推测式执行

在mapreduce中,影响一个作业的总执行时间最通常的因素是“落伍者”:在运算过程中,如果有一台机器花了很长时间才完成最后几个Map或者Reduce任务,导致mapReduce任务执行时间超过预期。出现“落伍者”的原因很多,CPU、内存、本地磁盘和网络带宽等因素都会导致某些Map或者reduce任务执行效率更加缓慢。

所谓推测式执行策略就是MapReduce对每一个任务都计算它的进度,如果一个任务的进度远远慢于其他的任务时,那么这个任务便可以被认为是一个“落伍者”。在发现一个“落伍者”之后,调度器就会在其他节点上重新调度这任务以便重新执行。在这个时候,一般会有两个相同的任务在同时执行,最终先完成的那个任务就算成功了,而没有完成的那个任务就会被杀死。

5.4 作业调度

调度的功能是将各种类型的作业在调度算法作用下分配给Hadoop集群中的计算节点,从而达到 分布式和并行计算 的目的。
调度算法模块中至少涉及两个重要流程:1.作业的选择 2.任务的分配。

调度过程 :

1)MapReduce框架中作业通常是通过JobClient.runJob(job)方法提交到JobTracker,JobTracker接收到JobClient的请求后将其加入作业调度队列中。

2)然后JobTracker一直等待JobClient通过RPC向其提交作业,而TaskTracker则一直通过RPC向JobTracker发送心跳信号询问是否有任务可执行,有则请求JobTracker派发任务给它执行。

3)如果JobTracker的作业队列不为空,则TaskTracker发送的心跳将会获得JobTracker向它派发的任务。
这是一个主动请求的任务:slave的TaskTracker主动向master的JobTracker请求任务。

4)当TaskTracker接到任务后,通过自身调度在本slave建立起Task,执行任务。


常用调度器 主要包括:JobQueueTaskScheduler(FIFO调度器),CapacityScheduler(容量调度器),Fair Scheduler(公平调度器)等。

  • FIFO调度器:基本思想是作业按照先后顺序统一放入一个队列中,然后根据优先级按照时间先后顺序依次执行,总体遵循先进先出的基本调度策略
  • 容量调度器:计算能力调度器,是雅虎结合自己的集群业务类型提出的一种调度策略。这种调度策略支持多种队列,每个队列可以单独配置一定的资源量,每个队列采取FIFO策略
  • 公平调度器:FaceBook开发的贡献给开源社区的,可以是多种作业并行执行并共享资源池。公平调度器的目的就是为了保证在多用户、多作业类型的情况下保证整个集群的资源利用率,同时可以让所有用户公平地共享整个集群资源。

6. MapReduce特性

6.1 计数器(Counters)

MapReduce Counter可以为我们提供一个观察MapReduce Job运行期中的各个细节数据视图。通过这些Counter计数器我们可以从全局视角来审查程序的运行情况,以及做出错误诊断进行相应处理。

6.2 DistributedCache

DistributedCache是MapReduce计算框架提供的功能,能够缓存应用程序所需要的文件(包括文本、档案文件、jar文件等)。可以将具体应用相关的、大尺寸的、只读的文件有效地分发到各个计算机中,应用程序只需要在JobConf中通过url(hdfs://)指定需要缓存的文件。

MapReduce框架在作业的所有任务执行之前会把必要的文件复制到slave节点上。它运行高效的原因是因为每个作业的文件只复制一次并且为那些没有文档的slave文件缓存文档。

6.3 Tool

6.4 Profiling

6.5 数据压缩

Hadoop详解(六):MapReduce计算框架详解相关推荐

  1. 大数据-MapReduce计算框架

    导语   MapReduce作为Hadoop核心编程模型,在Hadoop中,数据处理的核心就是MapReduce程序设计模型.下面就来分享一下MapReduce都有那些值得我们注意的事情. 文章目录 ...

  2. Twister: 迭代MapReduce计算框架

    摘要:MapReduce编程模型已经简化了许多数据并行应用的实现.编程模型的简化和MapReduce实现提供的服务质量在分布式计算社区上吸引了很多的热情.把MapReduce应用到多种科学应用的这么多 ...

  3. MapReduce 计算框架如何运作

    learn from 从0开始学大数据(极客时间) 1. MapReduce 作业启动和运行机制 作业涉及三类关键进程: 大数据应用进程 这类进程是启动 MapReduce 程序的主入口,主要是指定 ...

  4. Big Data(七)MapReduce计算框架(PPT截图)

    一.为什么叫MapReduce? Map是以一条记录为单位映射 Reduce是分组计算 转载于:https://www.cnblogs.com/littlepage/p/11156617.html

  5. matlab求最小范数解,python中计算最小范数解或伪逆解最精确的方法是什么?

    我的目标是解决:Kc=y 对于伪逆(即最小范数解): ^{pr2}$ 这样模型(希望)是高次多项式模型f(x) = sum_i c_i x^i.我特别感兴趣的是我们有更多的多项式特征比数据(少方程太多 ...

  6. 【Hadoop】Hadoop生态系列之MapReduce概述及MapReduce任务开发与发布

    上一篇:Hadoop生态系列之HDFS常用Shell命令实践及Java API操作HDFS 这里写目录标题 MapReduce 概述 流程分析 环境搭建 MapReduce任务开发 背景 实现 任务发 ...

  7. Hadoop学习笔记:MapReduce框架详解

    原文:http://blog.jobbole.com/84089/ 原文出处: 夏天的森林 开始聊mapreduce,mapreduce是hadoop的计算框架,我学hadoop是从hive开始入手, ...

  8. hadoop 学习笔记:mapreduce框架详解

    开始聊mapreduce,mapreduce是hadoop的计算框架,我学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能 ...

  9. mapreduce框架详解

    开始聊mapreduce,mapreduce是hadoop的计算框架,我学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能 ...

最新文章

  1. linux停止客户端,linux – 从客户端打印motd停止ssh登录?
  2. 如何启用SQL Server 2008的FILESTREAM特性
  3. centos yum安装_centos7上yum安装碰到的坑
  4. 【Python爬虫】Scrapy爬虫框架
  5. python怎么退出全屏_wxPython:退出全屏
  6. 网络拓扑发现原理研究
  7. 为什么一用迅雷下东西wifi就上不了网了?限速也没用
  8. ASP.NET Core 引用其他程序集项目里面的 Controller 控制器
  9. linux通过无线网卡上网,Linux使用4G/5G无线网卡模块上网
  10. 微信小程序项目-uniapp黑马优购
  11. 移动流量转赠给好友_手机包月流量用不完:教你如何转赠给好友使用
  12. 《韩非子》——《孤愤》
  13. STM32的BootLoader 从SD卡更新固件
  14. 关于别名(alias)的尴尬
  15. Android列表ListView控件的使用
  16. 第一个C++的程序你好世界
  17. C#设计模式学习笔记:(4)建造者模式
  18. 大厂音视频职位面试题目--今日头条
  19. matlab中polyval与polyvalm函数的区别
  20. 一、python简介(吉多•范罗苏姆:人生苦短,我用python)

热门文章

  1. Centos5搭建vsftpd服务
  2. Haddop学习:(一)序
  3. 卫星定位导航行业的产业链
  4. 户籍改革更进一步!我国将推动户籍准入年限同城化累计互认
  5. 为什么Python是数据科学领域最受欢迎的语言
  6. 腾讯云连续四年登上KVM开源贡献榜,两项技术获评年度核心突破
  7. 华为的发展与管理浅析
  8. C++中list的使用方法及常用list操作总结
  9. android打开项目更换版本,android1.6项目,换成其他包的方法,及修改默认启动虚拟机的版本...
  10. hbuilder打包的app如何设置自动清理缓存_手机空间不足,这些“隐形”文件如何找到删除?...