.MapReduce的思想核心是 而治之, 充分利用了并行处理的优势。
  1. Mapper map()方法是对输入的一个KV对调用一次!!
  2. Reduce Reduce()方法是对相同K的一组KV对调用执行一次
  3. Drive
二.MapReduce原理分析
  1. MapTask运行机制详解:

1. 首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一。
2. 将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n 作为分隔符,读取一行数据,返回<key,value>。Key表示每行首字符偏移值,value表示这一行 文本内容。
3. 读取split返回<key,value>,进入用户自己继承的Mapper类中,执行用户重写的map函数,RecordReader读取一行这里调用一次。
4. map逻辑完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。
MapReduce提供Partitioner接口,它的作用就是根据keyvaluereduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
5. 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。
    环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。
     缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spillpercent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Maptask的输出结果还可以往剩下的20MB内存中写,互不影响。
6、当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为!
     如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。
     那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。
7. 合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。
-----------------------------------------------------------------------------------------------------------------------------------------------------------------

maptask的并行度(数量)

注意:默认spitsize == blocksize ==128m  这样的好处是为了所谓的数据本地化或hdfs的短路读取,减少一些没必要的网络资源。

MapTask并行度是不是越多越好呢?(源码中的split_slop=1.1)
答案不是,如果一个文件仅仅比128M大一点点也被当成一个split来对待,而不是多个split.MR框架在并行运算的同时也会消耗更多资源,并行度越高资源消耗也越高,假设129M文件分为两个分片,一个是128M,一个是1M;对于1M的切片的Maptask来说,太浪费资源。

   2.ReduceTask 工作机制:

Reduce大致分为copysortreduce三个阶段,重点在前两个阶段。copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中数据merge到磁盘和将磁盘中的数据 进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行 finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。
详细步骤
  1. Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求 maptask获取属于自己的文件。
  2. Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数 值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge 有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的 数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过 程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种 merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge 方式生成最终的文件。
  3. 合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。

ReduceTask并行度

MapTask的并发数由切片数决定,ReduceTask数量的决定是可以直接手动设置:
注意事项
1. ReduceTask=0,表示没有Reduce阶段,输出文件数和MapTask数量保持一致;
2. ReduceTask数量不设置默认就是一个,输出文件数量为1个;
3. 如果数据分布不均匀,可能在Reduce阶段产生倾斜;
三.shuffle机制
 
map输入到reduce这个阶段的过程称为shuffle;

job对象进入后会有分区:

自定义分区小结:

分区是在map方法输出后在缓冲区内做的,他的参数就是map的输出的参数

1. 自定义分区器时最好保证分区数量与reduceTask数量保持一致;
2. 如果分区数量不止1个,但是reduceTask数量1个,此时只会输出一个文件。
3. 如果reduceTask数量大于分区数量,但是输出多个空文件
4. 如果reduceTask数量小于分区数量,有可能会报错
combiner合并小结
1. Combiner是MR程序中Mapper和Reducer之外的一种组件
2. Combiner组件的父类就是Reducer
3. Combiner和reducer的区别在于运行的位置
4. Combiner是在每一个maptask所在的节点运行;
5. Combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量
6. Combiner能够应用的前提是不能影响最终的业务逻辑,此外,Combiner的输出kv应该跟reducer 的输入kv类型要对应起来。

排序

MapTask
      它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,溢写完毕后,它会对磁盘上所有文件进行归并排序。
      ReduceTask 当所有数据拷贝完毕后,ReduceTask统-对内存和磁盘上的所有数据进行一次归并排序。
1. 部分排序.
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序
2. 全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置- -个ReduceTask。但该方法在处理大型文件时效率极低,因为- -台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
3. 辅助排序: ( GroupingComparator分组)
在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
4. 二次排序.
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
GroupingComparator 是reduce端的组件:
      GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑。
MapReduce读取和输出数据:
   InputFormat:InputFormat是MapReduce框架用来读取数据的类。
   分类:

  1. TextInputFormat (普通文本文件,MR框架默认的读取实现类型)
  2. KeyValueTextInputFormat(读取一行文本数据按照指定分隔符,把数据封装为kv类型)
  3. NLineInputF ormat(读取数据按照行数进行划分分片)
  4. CombineTextInputFormat(合并小文件,避免启动过多MapTask任务)
  5. 自定义InputFormat

CombineTextInputFormat:MR框架默认的TextInputFormat切片机制按文件划分切片,文件无论多小,都是单独一个切片, 然后由一个MapTask处理,如果有大量小文件,就对应的会生成并启动大量的 MapTask,而每个 MapTask处理的数据量很小大量时间浪费在初始化资源启动收回等阶段,这种方式导致资源利用 率不高。

CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上划分成一个切片,这样多个小文件就可以交给一个MapTask处理,提高资源利用率。
使用代码

            // 如果不设置InputFormat,它默认用的是TextInputFormat.class
            job.setInputFormatClass(CombineTextInputFormat.class);
            //虚拟存储切片最大值设置4m
            CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

CombineTextInputFormat的原理:切片生成过程分为两部分:虚拟存储过程和切片过程

假设设置setMaxInputSplitSize值为4M
四个小文件:1.txt -->2M ;2.txt-->7M;3.txt-->0.3M;4.txt--->8.2M
虚拟存储过程:把输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值进行比较,如果不大于设置的最大值,逻辑上划分一个。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此将文件均分成2个虚拟存储块(防止出现太小切片)。 比如如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分出一个4M的块。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的非常小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
1.txt-->2M;2M<4M;一个块;
2.txt-->7M;7M>4M,但是不大于两倍,均匀分成两块;两块:每块3.5M;
3.txt-->0.3M;0.3<4M ,0.3M<4M ,一个块
4.txt-->8.2M;大于最大值且大于两倍;一个4M的块,剩余4.2M分成两块,每块2.1M
所有块信息:
2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M 共7个虚拟存储块。
切片过程
判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个
切片。
如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
按照之前输入文件:有4个小文件大小分别为2M、7M、0.3M以及8.2M这四个小文件,
则虚拟存储之后形成7个文件块,大小分别为:
2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M
最终会形成3个切片,大小分别为:
(2+3.5)M,(3.5+0.3+4)M,(2.1+2.1)M
注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
自定义inputformat:
方案:将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
outputformat:
OutputFormat:是MapReduce输出数据的基类,所有MapReduce的数据输出都实现了OutputFormat抽象类。下面我们介绍几种常见的OutputFormat子类
TextOutputFormat
     默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方 法把它们转换为字符串。
SequenceFileOutputFormat
      将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这是一种好的输出格式,因为它的格式紧凑,很容易被压缩。

四 数据压缩机制:

需要压缩的地方:

Map输入端压缩  Map输出端压缩   Reduce端输出压缩
压缩配置:
设置map阶段压缩
Configuration configuration = new Configuration();
configuration.set("mapreduce.map.output.compress","true");
configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
设置reduce阶段的压缩
configuration.set("mapreduce.output.fileoutputformat.compress","true");
configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD" );
configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.ap ache.hadoop.io.compress.SnappyCodec");
配置文件压缩:
<property>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.type</name>
<value>RECORD</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>

   

MapReduce框架相关推荐

  1. hadoop之MapReduce框架TaskTracker端心跳机制分析(源码分析第六篇)

    1.概述 MapReduce框架中的master/slave心跳机制是整个集群运作的基础,是沟通TaskTracker和JobTracker的桥梁.TaskTracker周期性地调用心跳RPC函数,汇 ...

  2. Hadoop mapreduce框架简介

    传统hadoop MapReduce架构(老架构) 从上图中可以清楚的看出原 MapReduce 程序的流程及设计思路: 1.首先用户程序 (JobClient) 提交了一个 job,job 的信息会 ...

  3. Hadoop学习笔记:MapReduce框架详解

    原文:http://blog.jobbole.com/84089/ 原文出处: 夏天的森林 开始聊mapreduce,mapreduce是hadoop的计算框架,我学hadoop是从hive开始入手, ...

  4. mapreduce框架详解

    开始聊mapreduce,mapreduce是hadoop的计算框架,我学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能 ...

  5. 传统MapReduce框架

    传统的MapReduce框架是google于2004年在论文:"MapReduce: Simplified Data Processing on Large Clusters"提出 ...

  6. Hadoop 新 MapReduce 框架 Yarn 详解

    Hadoop MapReduceV2(Yarn) 框架简介 原 Hadoop MapReduce 框架的问题 对于业界的大数据存储及分布式处理系统来说,Hadoop 是耳熟能详的卓越开源分布式文件存储 ...

  7. hadoop 学习笔记:mapreduce框架详解

    开始聊mapreduce,mapreduce是hadoop的计算框架,我学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能 ...

  8. MapReduce框架中map、reduce方法的运行机制

    MapReduce框架中map.reduce方法的运行机制 Hadoop的API中提供了Mapper和Reducer抽象类,分别有个抽象map()方法和reduce()方法,使用时只需实现该抽象类和抽 ...

  9. MapReduce框架在Yarn上的具体解释

    MapReduce任务解析 在YARN上一个MapReduce任务叫做一个Job. 一个Job的主程序在MapReduce框架上实现的应用名称叫MRAppMaster. MapReduce任务的Tim ...

  10. 老司机带你用 Go 语言实现 MapReduce 框架

      MapReduce 是 Google 提出的一个软件架构,用于大规模数据集(大于1TB)的并行运算.简而言之,就是将任务切分成很小的任务然后一个一个区的执行最后汇总,这就像小时候我们老师经常教育我 ...

最新文章

  1. 使用python函数计算3.5四舍五入的结果_python 数字的四舍五入-Go语言中文社区
  2. docker中部署Nginx
  3. TOMCAT问题总结
  4. 我通过了 Google 技术面试,所以你也能行!
  5. 详解云安全攻防模型,这些攻击战略和战术越早知道越好!
  6. spring 数组中随机取几个_最新redux-spring前端模块化框架
  7. 怎么让jsp中的按钮置灰不能使用_拆解按钮规范
  8. WINDOWS下,找包含特殊字串的文件的解决办法
  9. 美瞳微商如何引流?微商卖美瞳怎么宣传?美瞳微商如何引流人脉
  10. 国美做手机、天猫玩魔盒……电商做产品到底会怎么辣眼睛
  11. overleaf表格_latex插入表格心得
  12. CSDN日报190910:程序员都秃头,商务个个是人精
  13. 用html给图片加像素,更改照片像素和大小
  14. 吉首大学2019年程序设计竞赛(重现赛)A:SARS病毒(找规律 or 推公式+欧拉降幂+快速幂)
  15. CheckBox和ListView的结合使用
  16. vue导出excel加一个进度条_vue 实现excel导出功能
  17. 知物由学 | “群控软件”助长黑灰产套利的零和游戏,硬核技术打击隐秘的不公
  18. FinalShell 远程工具(即xshell,xftp,远程桌面连接一体)
  19. Unity InputField输入框调用win10平板虚拟键盘
  20. 在线网校平台搭建的流程

热门文章

  1. 转换并压缩视频的小技巧
  2. android高仿微信聊天消息列表自由复制文字,双击查看文本内容
  3. Android开发之获取网络类型(WIFI、2G、3G、4G)和运营商名称
  4. 【React生命周期】
  5. leetcode 537 py 中map函数用法
  6. 你不能不了解的《3P通道+3P功能》
  7. PAT5-06. 航空公司VIP客户查询
  8. 简化行政地图边界 简化地图边界的方法 GeoPandas
  9. GNU Radio 实数与复数信号分析
  10. RESTful客户端库:RestClient