http://blog.csdn.net/aijiudu/article/details/72353510

废话不说直接来一张图如下:

从JVM的角度看Map和Reduce

Map阶段包括:

第一读数据:从HDFS读取数据

1、问题:读取数据产生多少个Mapper??

Mapper数据过大的话,会产生大量的小文件,由于Mapper是基于虚拟机的,过多的Mapper创建和初始化及关闭虚拟机都会消耗大量的硬件资源;

Mapper数太小,并发度过小,Job执行时间过长,无法充分利用分布式硬件资源;

2、Mapper数量由什么决定??

(1)输入文件数目

(2)输入文件的大小

(3)配置参数

这三个因素决定的。

涉及参数:

mapreduce.input.fileinputformat.split.minsize //启动map最小的split size大小,默认0
    mapreduce.input.fileinputformat.split.maxsize //启动map最大的split size大小,默认256M
    dfs.block.size//block块大小,默认64M
    计算公式:splitSize =  Math.max(minSize, Math.min(maxSize, blockSize));

例如默认情况下:例如一个文件800M,Block大小是128M,那么Mapper数目就是7个。6个Mapper处理的数据是128M,1个Mapper处理的数据是32M;

再例如一个目录下有三个文件大小分别为:5M10M 150M 这个时候其实会产生四个Mapper处理的数据分别是5M,10M,128M,22M。

Mapper是基于文件自动产生的,如果想要自己控制Mapper的个数???

就如上面,5M,10M的数据很快处理完了,128M要很长时间;这个就需要通过参数的控制来调节Mapper的个数。

减少Mapper的个数的话,就要合并小文件,这种小文件有可能是直接来自于数据源的小文件,也可能是Reduce产生的小文件。

设置合并器:(set都是在hive脚本,也可以配置Hadoop)

设置合并器本身:

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

set hive.merge.mapFiles=true;

set hive.merge.mapredFiles=true;

set hive.merge.size.per.task=256000000;//每个Mapper要处理的数据,就把上面的5M10M……合并成为一个

一般还要配合一个参数:

set mapred.max.split.size=256000000 // mapred切分的大小

set mapred.min.split.size.per.node=128000000//低于128M就算小文件,数据在一个节点会合并,在多个不同的节点会把数据抓过来进行合并。

Hadoop中的参数:可以通过控制文件的数量控制mapper数量

mapreduce.input.fileinputformat.split.minsize(default:0),小于这个值会合并

mapreduce.input.fileinputformat.split.maxsize 大于这个值会切分

第二处理数据:

Partition说明

对于map输出的每一个键值对,系统都会给定一个partition,partition值默认是通过计算key的hash值后对Reduce task的数量取模获得。如果一个键值对的partition值为1,意味着这个键值对会交给第一个Reducer处理。

自定义partitioner的情况:

1、我们知道每一个Reduce的输出都是有序的,但是将所有Reduce的输出合并到一起却并非是全局有序的,如果要做到全局有序,我们该怎么做呢?最简单的方式,只设置一个Reduce task,但是这样完全发挥不出集群的优势,而且能应对的数据量也很受限。最佳的方式是自己定义一个Partitioner,用输入数据的最大值除以系统Reduce task数量的商作为分割边界,也就是说分割数据的边界为此商的1倍、2倍至numPartitions-1倍,这样就能保证执行partition后的数据是整体有序的。

2、解决数据倾斜:另一种需要我们自己定义一个Partitioner的情况是各个Reduce task处理的键值对数量极不平衡。对于某些数据集,由于很多不同的key的hash值都一样,导致这些键值对都被分给同一个Reducer处理,而其他的Reducer处理的键值对很少,从而拖延整个任务的进度。当然,编写自己的Partitioner必须要保证具有相同key值的键值对分发到同一个Reducer。

3、自定义的Key包含了好几个字段,比如自定义key是一个对象,包括type1,type2,type3,只需要根据type1去分发数据,其他字段用作二次排序。

环形缓冲区

Map的输出结果是由collector处理的,每个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。

这个数据结构其实就是个字节数组,叫Kvbuffer,名如其义,但是这里面不光放置了数据,还放置了一些索引数据,给放置索引数据的区域起了一个Kvmeta的别名,在Kvbuffer的一块区域上穿了一个IntBuffer(字节序采用的是平台自身的字节序)的马甲。数据区域和索引数据区域在Kvbuffer中是相邻不重叠的两个区域,用一个分界点来划分两者,分界点不是亘古不变的,而是每次Spill之后都会更新一次。初始的分界点是0,数据的存储方向是向上增长,索引数据的存储方向是向下增长Kvbuffer的存放指针bufindex是一直闷着头地向上增长,比如bufindex初始值为0,一个Int型的key写完之后,bufindex增长为4,一个Int型的value写完之后,bufindex增长为8。

索引是对在kvbuffer中的键值对的索引,是个四元组,包括:value的起始位置、key的起始位置、partition值、value的长度,占用四个Int长度,Kvmeta的存放指针Kvindex每次都是向下跳四个“格子”,然后再向上一个格子一个格子地填充四元组的数据。比如Kvindex初始位置是-4,当第一个键值对写完之后,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的长度,然后Kvindex跳到-8位置,等第二个键值对和索引写完之后,Kvindex跳到-12位置。

第三写数据到磁盘

Mapper中的Kvbuffer的大小默认100M,可以通过mapreduce.task.io.sort.mb(default:100)参数来调整。可以根据不同的硬件尤其是内存的大小来调整,调大的话,会减少磁盘spill的次数此时如果内存足够的话,一般都会显著提升性能。spill一般会在Buffer空间大小的80%开始进行spill(因为spill的时候还有可能别的线程在往里写数据,因为还预留空间,有可能有正在写到Buffer中的数据),可以通过mapreduce.map.sort.spill.percent(default:0.80)进行调整,Map Task在计算的时候会不断产生很多spill文件,在Map Task结束前会对这些spill文件进行合并,这个过程就是merge的过程。mapreduce.task.io.sort.factor(default:10),代表进行merge的时候最多能同时merge多少spill,如果有100个spill个文件,此时就无法一次完成整个merge的过程,这个时候需要调大mapreduce.task.io.sort.factor(default:10)来减少merge的次数,从而减少磁盘的操作;

Spill这个重要的过程是由Spill线程承担,Spill线程从Map任务接到“命令”之后就开始正式干活,干的活叫SortAndSpill,原来不仅仅是Spill,在Spill之前还有个颇具争议性的Sort。

Combiner存在的时候,此时会根据Combiner定义的函数对map的结果进行合并,什么时候进行Combiner操作呢???和Map在一个JVM中,是由min.num.spill.for.combine的参数决定的,默认是3,也就是说spill的文件数在默认情况下由三个的时候就要进行combine操作,最终减少磁盘数据;

减少磁盘IO和网络IO还可以进行:压缩,对spill,merge文件都可以进行压缩。中间结果非常的大,IO成为瓶颈的时候压缩就非常有用,可以通过mapreduce.map.output.compress(default:false)设置为true进行压缩,数据会被压缩写入磁盘,读数据读的是压缩数据需要解压,在实际经验中Hive在Hadoop的运行的瓶颈一般都是IO而不是CPU,压缩一般可以10倍的减少IO操作,压缩的方式Gzip,Lzo,BZip2,Lzma等,其中Lzo是一种比较平衡选择,mapreduce.map.output.compress.codec(default:org.apache.hadoop.io.compress.DefaultCodec)参数设置。但这个过程会消耗CPU,适合IO瓶颈比较大。

Shuffle和Reduce阶段包括:

一、Copy

1、由于job的每一个map都会根据reduce(n)数将数据分成map 输出结果分成n个partition,所以map的中间结果中是有可能包含每一个reduce需要处理的部分数据的。所以,为了优化reduce的执行时间,hadoop中是等job的第一个map结束后,所有的reduce就开始尝试从完成的map中下载该reduce对应的partition部分数据,因此map和reduce是交叉进行的,其实就是shuffle。Reduce任务通过HTTP向各个Map任务拖取(下载)它所需要的数据(网络传输),Reducer是如何知道要去哪些机器取数据呢?一旦map任务完成之后,就会通过常规心跳通知应用程序的Application Master。reduce的一个线程会周期性地向master询问,直到提取完所有数据(如何知道提取完?)数据被reduce提走之后,map机器不会立刻删除数据,这是为了预防reduce任务失败需要重做。因此map输出数据是在整个作业完成之后才被删除掉的。

2、reduce进程启动数据copy线程(Fetcher),通过HTTP方式请求maptask所在的TaskTracker获取maptask的输出文件。由于map通常有许多个,所以对一个reduce来说,下载也可以是并行的从多个map下载,那到底同时到多少个Mapper下载数据??这个并行度是可以通过mapreduce.reduce.shuffle.parallelcopies(default5)调整。默认情况下,每个Reducer只会有5个map端并行的下载线程在从map下数据,如果一个时间段内job完成的map有100个或者更多,那么reduce也最多只能同时下载5个map的数据,所以这个参数比较适合map很多并且完成的比较快的job的情况下调大,有利于reduce更快的获取属于自己部分的数据。 在Reducer内存和网络都比较好的情况下,可以调大该参数;

3、reduce的每一个下载线程在下载某个map数据的时候,有可能因为那个map中间结果所在机器发生错误,或者中间结果的文件丢失,或者网络瞬断等等情况,这样reduce的下载就有可能失败,所以reduce的下载线程并不会无休止的等待下去,当一定时间后下载仍然失败,那么下载线程就会放弃这次下载,并在随后尝试从另外的地方下载(因为这段时间map可能重跑)。reduce下载线程的这个最大的下载时间段是可以通过mapreduce.reduce.shuffle.read.timeout(default180000秒)调整的。如果集群环境的网络本身是瓶颈,那么用户可以通过调大这个参数来避免reduce下载线程被误判为失败的情况。一般情况下都会调大这个参数,这是企业级最佳实战。

二、MergeSort

1、这里的merge和map端的merge动作类似,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,然后当使用内存达到一定量的时候才spill磁盘。这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置。这个内存大小的控制就不像map一样可以通过io.sort.mb来设定了,而是通过另外一个参数 mapreduce.reduce.shuffle.input.buffer.percent(default 0.7f 源码里面写死了) 来设置,这个参数其实是一个百分比,意思是说,shuffile在reduce内存中的数据最多使用内存量为:0.7 × maxHeap of reduce task。JVM的heapsize的70%。内存到磁盘merge的启动门限可以通过mapreduce.reduce.shuffle.merge.percent(default0.66)配置。也就是说,如果该reduce task的最大heap使用量(通常通过mapreduce.admin.reduce.child.java.opts来设置,比如设置为-Xmx1024m)的一定比例用来缓存数据。默认情况下,reduce会使用其heapsize的70%来在内存中缓存数据。假设 mapreduce.reduce.shuffle.input.buffer.percent 为0.7,reducetask的max heapsize为1G,那么用来做下载数据缓存的内存就为大概700MB左右。这700M的内存,跟map端一样,也不是要等到全部写满才会往磁盘刷的,而是当这700M中被使用到了一定的限度(通常是一个百分比),就会开始往磁盘刷(刷磁盘前会先做sortMerge)。这个限度阈值也是可以通过参数 mapreduce.reduce.shuffle.merge.percent(default0.66)来设定。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。这种merge方式一直在运行,直到没有map端的数据时才结束,然后启动磁盘到磁盘的merge方式生成最终的那个文件。

这里需要强调的是,merge有三种形式:1)内存到内存(memToMemMerger)2)内存中Merge(inMemoryMerger)3)磁盘上的Merge(onDiskMerger)具体包括两个:(一)Copy过程中磁盘合并(二)磁盘到磁盘。

(1)内存到内存Merge(memToMemMerger)    Hadoop定义了一种MemToMem合并,这种合并将内存中的map输出合并,然后再写入内存。这种合并默认关闭,可以通过mapreduce.reduce.merge.memtomem.enabled(default:false)

打开,当map输出文件达到mapreduce.reduce.merge.memtomem.threshold时,触发这种合并。

(2)内存中Merge(inMemoryMerger):当缓冲中数据达到配置的阈值时,这些数据在内存中被合并、写入机器磁盘。阈值有2种配置方式:

配置内存比例:前面提到reduceJVM堆内存的一部分用于存放来自map任务的输入,在这基础之上配置一个开始合并数据的比例。假设用于存放map输出的内存为500M,mapreduce.reduce.shuffle.merge.percent配置为0.66,则当内存中的数据达到330M的时候,会触发合并写入。

配置map输出数量: 通过mapreduce.reduce.merge.inmem.threshold配置。在合并的过程中,会对被合并的文件做全局的排序。如果作业配置了Combiner,则会运行combine函数,减少写入磁盘的数据量。

(3)磁盘上的Merge(onDiskMerger):

(3.1)Copy过程中磁盘Merge:在copy过来的数据不断写入磁盘的过程中,一个后台线程会把这些文件合并为更大的、有序的文件。如果map的输出结果进行了压缩,则在合并过程中,需要在内存中解压后才能给进行合并。这里的合并只是为了减少最终合并的工作量,也就是在map输出还在拷贝时,就开始进行一部分合并工作。合并的过程一样会进行全局排序。

(3.2)最终磁盘中Merge:当所有map输出都拷贝完毕之后,所有数据被最后合并成一个整体有序的文件,作为reduce任务的输入。这个合并过程是一轮一轮进行的,最后一轮的合并结果直接推送给reduce作为输入,节省了磁盘操作的一个来回。最后(所以map输出都拷贝到reduce之后)进行合并的map输出可能来自合并后写入磁盘的文件,也可能来及内存缓冲,在最后写入内存的map输出可能没有达到阈值触发合并,所以还留在内存中。

每一轮合并不一定合并平均数量的文件数,指导原则是使用整个合并过程中写入磁盘的数据量最小,为了达到这个目的,则需要最终的一轮合并中合并尽可能多的数据,因为最后一轮的数据直接作为reduce的输入,无需写入磁盘再读出。因此我们让最终的一轮合并的文件数达到最大,即合并因子的值,通过mapreduce.task.io.sort.factor(default:10)来配置。

如上图:Reduce阶段中一个Reduce过程 可能的合并方式为:假设现在有20个map输出文件,合并因子配置为5,则需要4轮的合并。最终的一轮确保合并5个文件,其中包括2个来自前2轮的合并结果,因此原始的20个中,再留出3个给最终一轮。

三、Reduce函数调用(用户自定义业务逻辑)

1、当reduce将所有的map上对应自己partition的数据下载完成后,就会开始真正的reduce计算阶段。reducetask真正进入reduce函数的计算阶段,由于reduce计算时肯定也是需要消耗内存的,而在读取reduce需要的数据时,同样是需要内存作为buffer,这个参数是控制,reducer需要多少的内存百分比来作为reduce读已经sort好的数据的buffer大小??默认用多大内存呢??默认情况下为0,也就是说,默认情况下,reduce是全部从磁盘开始读处理数据。可以用mapreduce.reduce.input.buffer.percent(default 0.0)(源代码MergeManagerImpl.java:674行)来设置reduce的缓存。如果这个参数大于0,那么就会有一定量的数据被缓存在内存并输送给reduce,当reduce计算逻辑消耗内存很小时,可以分一部分内存用来缓存数据,可以提升计算的速度。所以默认情况下都是从磁盘读取数据,如果内存足够大的话,务必设置该参数让reduce直接从缓存读数据,这样做就有点Spark Cache的感觉;

2、Reduce在这个阶段,框架为已分组的输入数据中的每个 <key, (list of values)>对调用一次 reduce(WritableComparable,Iterator, OutputCollector, Reporter)方法。Reduce任务的输出通常是通过调用 OutputCollector.collect(WritableComparable,Writable)写入文件系统的。Reducer的输出是没有排序的。

性能调优

如果能够根据情况对shuffle过程进行调优,对于提供MapReduce性能很有帮助。相关的参数配置列在后面的表格中。

一个通用的原则是给shuffle过程分配尽可能大的内存,当然你需要确保map和reduce有足够的内存来运行业务逻辑。因此在实现Mapper和Reducer时,应该尽量减少内存的使用,例如避免在Map中不断地叠加。

运行map和reduce任务的JVM,内存通过mapred.child.java.opts属性来设置,尽可能设大内存。容器的内存大小通过mapreduce.map.memory.mb和mapreduce.reduce.memory.mb来设置,默认都是1024M。

map优化

在map端,避免写入多个spill文件可能达到最好的性能,一个spill文件是最好的。通过估计map的输出大小,设置合理的mapreduce.task.io.sort.*属性,使得spill文件数量最小。例如尽可能调大mapreduce.task.io.sort.mb。

map端相关的属性如下表:

reduce优化

在reduce端,如果能够让所有数据都保存在内存中,可以达到最佳的性能。通常情况下,内存都保留给reduce函数,但是如果reduce函数对内存需求不是很高,将mapreduce.reduce.merge.inmem.threshold(触发合并的map输出文件数)设为0,mapreduce.reduce.input.buffer.percent(用于保存map输出文件的堆内存比例)设为1.0,可以达到很好的性能提升。在2008年的TB级别数据排序性能测试中,Hadoop就是通过将reduce的中间数据都保存在内存中胜利的。

reduce端相关属性:

通用优化

Hadoop默认使用4KB作为缓冲,这个算是很小的,可以通过io.file.buffer.size来调高缓冲池大小。

转载于:https://www.cnblogs.com/felixzh/p/8604188.html

MapReduce过程详解及其性能优化相关推荐

  1. 详解 Android 性能优化

    为什么关注性能 对于一款APP,用户首先关注的是 app的性能,而不是APP本身的属性功能,用户不关心你是否是搞社交,是否搞电商,是否是一款强大的美图滤镜app,用户首先关注的是 性能--性能不好,用 ...

  2. Google官方 详解 Android 性能优化【史诗巨著之内存篇】

    尊重博主原创,如需转载,请附上本文链接http://blog.csdn.net/chivalrousman/article/details/51553114#t16 为什么关注性能 对于一款APP,用 ...

  3. DBA整理的万字详解MySQL性能优化,值得收藏!

    点击关注公众号,实用技术文章及时了解  作者:LanceToBigData  cnblogs.com/zhangyinhua/p/7620964.html 说起MySQL的查询优化,相信大家积累一堆技 ...

  4. 高手详解SQL性能优化十条建议

    1.查询的模糊匹配  尽量避免在一个复杂查询里面使用 LIKE '%parm1%'-- 红色标识位置的百分号会导致相关列的索引无法使用,最好不要用. 解决办法: 其实只需要对该脚本略做改进,查询速度便 ...

  5. MapReduce过程详解

    MapReduce是一种云计算的核心计算模式,是一种分布式运算技术,也是简化的分布式并行编程模式,主要用于大规模并行程序并行问题. MapReduce的主要思想:自动将一个大的计算(程序)拆分成Map ...

  6. IOS UITableView详解二性能优化 LOL游戏人物展示

    为什么80%的码农都做不了架构师?>>>    一 重用UITableViewCell UITableView滑动过程中,屏幕底部的信息上移到屏幕,会创建UITableViewCel ...

  7. 从龟速 11s 到闪电 1s,详解前端性能优化之首屏加载

    点击上方 前端瓶子君,关注公众号 回复算法,加入前端编程面试算法每日一题群 全文共6511字/词,阅读大概需要13分钟,太长不看党请直接移步

  8. 首屏加载从11s到1s,详解前端性能优化

    作者:夜雪暮歌 https://juejin.cn/post/6949896020788690958 全文共6511字/词,阅读大概需要13分钟,太长不看党请直接移步

  9. chia 的 p 图过程详解及优化攻略

    仅仅两个⽉的时间,chia 全⽹算⼒就突破了 15EP,1p 算⼒单⽇收益也从最初的三十多个变成了零点⼏个.在这场激烈的博弈中,⼤家为了更快的⼊场,都想加快 p 图速度,⽽事实证明,较快的 p 图速度 ...

最新文章

  1. PostgreSQL创建数据库时报错:ERROR: source database template1 is being accessed by other users
  2. 哪里的草莓质量又好又便宜呢?水果批发市场(前提你买的得多)
  3. 通过JAVA对HDFS进行操作管理插件
  4. sybase游标使用方法
  5. 使用所有对象共有的方法
  6. Java Code Geeks Andygene Web原型
  7. 牛客网---Java题库(1~10)
  8. 我是如何旅游 5 个月拿到 3 份 Offer 并收入翻倍的
  9. jQuery 工具方法 (全)
  10. 和10位CIO,聊了聊他们今年的OKR
  11. 摩托罗拉linux专属游戏,摩托罗拉E680软件以及游戏应用大全
  12. macOS Big Sur 11.5 (20G71) 虚拟机 ISO 镜像
  13. 关于人脸识别的最全研究!
  14. IntelliJ IDEA2020安装使用(保姆级)
  15. ubuntu18.04安装Nvidia显卡驱动后黑屏及网络、蓝牙驱动消失的解决方案
  16. Python音乐可视化
  17. 华为数字化转型之道 实践篇 第六章 数字化重构业务运作模式
  18. C语言-文件操作-文件打开方式
  19. Linux离线安装Maven
  20. CVPR 2017 paper

热门文章

  1. 过拟合问题——正则化方法
  2. linux 文件系统 簇 浪费空间,Linux rm -rf删除文件不释放空间的解决办法
  3. 小程序背景图满屏_竞赛答题小程序
  4. java agent 监控tomcat_promethues监控tomcat
  5. Linux_PAM_用户之间的信息传递
  6. 三、PHP框架Laravel学习笔记——路由参数、重定向、视图
  7. 九、ES6的箭头函数
  8. python的普通方法、类方法和静态方法
  9. LintCode MySQL 1936. 张三的故事 III
  10. LeetCode 828. 统计子串中的唯一字符(中心扩展)