文章目录

  • 1.MapTask的数量决定
  • 2.如何来调整MapTask的数量
    • 2.1 增加map的数量 : 调小maxsize (要小于blockSize才有效,比如100byte)
    • 2.2 减少map的数量 : 调大minSize (要大于blockSize才有效,比如250M)
    • 2.3生产中一般不调整,但是要知道原理。
  • 3. ReduceTask的数量决定
    • 3.1 在执行hive shell的时候可以看到下列日志
    • 3.2 官网对这三个参数的解释
    • 3.3 通过源码分析 hive是如何通过方式一 动态计算reduce的个数的
  • 4.如何调整reduceTask的数量
    • 4.1.hive.exec.reducers.bytes.per.reducer与hive.exec.reducers.max
    • 4.2.mapreduce.job.reduces
    • 4.3 设置reduce数量无效的情况
      • 4.3.1 order by
      • 4.3.2 笛卡尔积
      • 4.3.3 map端输出的数据量很小

1.MapTask的数量决定

 (1)文件的个数   (2)文件大小   (3)blocksize

简单来说:输入的目录中文件的数量决定多少个map会被运行起来,应用针对每一个分片运行一个map,一般而言,对于每一个输入的文件会有一个map split。如果输入文件太大,超过了hdfs块的大小(128M)那么对于同一个输入文件我们会有多余2个的map运行起来

mapTask的数量由文件决定的数学含义:

MapReduce的每一个map处理数据是不能跨越文件的。也就是说minMapNum>=inputFileNum,所以,最终的map个数应该为:
mapNum=max(computeMapNum,inputFileNum)

下面讲述文件大小和blockSize是如果影响mapTask的数量的。

2.如何来调整MapTask的数量

 在不改变blockSize的情况下,变更map的情况

计算splitSize的公式:

  //下面的调整都是按照此公式进行调整(具体解释看我的另外一篇博客 https://blog.csdn.net/lihuazaizheli/article/details/107370695)splitSize=Min(maxSize,Max(minSize,blockSize))当 minSize《 blockSize 《 maxSize 时,splitSize=blockSize当 blockSize《 minSize《  maxSize 时,splitSize=minSize ,可应用于减少map数量,调大minSize(至少大于blockSize)当 minSize《  maxSize 《blockSize 时,splitSize=maxSize , 可应用于增加map数量,调小maxSize(至少小于blockSize)

如果开启了本地测试模式,需要set hive.exec.mode.local.auto=fase关闭才能够看出具的map与reduce个数的日志

重要结论:
(1)【重要】如果不进行任何设置,默认的map个数是和blcok_size相关的。
default_num = 输入文件的整体大小 / block_size;
(2)可以通过参数mapred.map.tasks来设置程序员期望的map个数,但是这个个数只有在大于default_num的时候,才会生效。
(3)一般是结合hive使用,在hive中的更加通用的配置(下文2.1中介绍的是fileinputformat的输入形式配置)
a.minSize相关的参数:

hive (default)> set  mapred.min.split.size;
mapred.min.split.size=1
hive (default)> set mapred.min.split.size.per.node;
mapred.min.split.size.per.node=1
hive (default)> set mapred.min.split.size.per.rack;
mapred.min.split.size.per.rack=1

b.maxSize相关的参数

hive (default)> set  mapred.max.split.size;
mapred.max.split.size=256000000

c.本地聚合参数

hive (default)> set hive.input.format;
hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat#官网解释
hive.input.format
Default Value: org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
Added In: Hive 0.5.0
The default input format. Set this to HiveInputFormat if you encounter problems with CombineHiveInputFormat.
Also see:
Configuration Properties#hive.tez.input.format

2.1 增加map的数量 : 调小maxsize (要小于blockSize才有效,比如100byte)

注意,下面的例子是fileinputformat格式的数据
方式一:

  (1)maxsize的默认值为256Mhive (default)> set mapreduce.input.fileinputformat.split.maxsize;mapreduce.input.fileinputformat.split.maxsize=256000000mapreduce.input.fileinputformat.split.maxsize(2) 将次值调小为100字节,此时splitSize=100;set mapreduce.input.fileinputformat.split.maxsize=100;select count(1) from empt;可以看到日志number of mappers: 2; number of reducers: 1

方式二:

  设置mapred.map.tasks 为一个较大的值(大于default_num=输入文件整体大小/splitSize)。

2.2 减少map的数量 : 调大minSize (要大于blockSize才有效,比如250M)

    (1)minsize的默认值是1hive (default)> set mapreduce.input.fileinputformat.split.minsize;mapreduce.input.fileinputformat.split.minsize=1(2)调整minsize为256M,此时splitSize=256Mset mapreduce.input.fileinputformat.split.minsize=256000000;select count(1) from empt;

2.3生产中一般不调整,但是要知道原理。

      一个文件有2列数据,有128M, 默认blockSize为128M的情况下,只有1个map,这个时候可以将map数量调大,以更快的处理数据。

3. ReduceTask的数量决定

 reduce个数决定文件的输出个数(1)决定方式一 后面看源码解析       hive.exec.reducers.bytes.per.reducer   参数1(default:256M)hive.exec.reducers.max                 参数2 (default:1009)min(参数2, 总数据量/参数1)一般参数2的值不变动,在普通集群规模下,hive根据数据量自动决定reduce的个数为:  输入总数据量/hive.exec.reducers.bytes.per.reducer(2)决定方式二set mapreduce.job.reduces=1;

3.1 在执行hive shell的时候可以看到下列日志

 Number of reduce tasks determined at compile time: 1In order to change the average load for a reducer (in bytes):set hive.exec.reducers.bytes.per.reducer=<number>In order to limit the maximum number of reducers:set hive.exec.reducers.max=<number>In order to set a constant number of reducers:set mapreduce.job.reduces=<number>这三个参数决定了reduce的个数,下面详细解释

3.2 官网对这三个参数的解释

(1)hive.exec.reducers.bytes.per.reducer

    Default Value: 1,000,000,000 prior to Hive 0.14.0; 256 MB (256,000,000) in Hive 0.14.0 and laterAdded In: Hive 0.2.0; default changed in 0.14.0 with HIVE-7158 (and HIVE-7917)Size per reducer. The default in Hive 0.14.0 and earlier is 1 GB, that is, if the input size is 10 GB then 10 reducers will be used. # 重点看这句话In Hive 0.14.0 and later the default is 256 MB, that is, if the input size is 1 GB then 4 reducers will be used.# chd5.16.2 hive环境中是128Mhive (default)> set hive.exec.reducers.bytes.per.reducer;hive.exec.reducers.bytes.per.reducer=134217728

(2)hive.exec.reducers.max

    Default Value: 999 prior to Hive 0.14.0; 1009 in Hive 0.14.0 and laterAdded In: Hive 0.2.0; default changed in 0.14.0 with HIVE-7158 (and HIVE-7917)Maximum number of reducers that will be used. If the one specified in the configuration property Configuration Properties#mapred.reduce.tasks is negative, Hive will use this as the maximum number of reducers when automatically determining the number of reducers# chd5.16.2 hive环境hive (default)> set hive.exec.reducers.max;hive.exec.reducers.max=500

(3)mapreduce.job.reduces

    Default Value: -1 (disabled)Added in: Hive 1.1.0 with HIVE-7567Sets the number of reduce tasks for each Spark shuffle stage (e.g. the number of partitions when performing a Spark shuffle). This is set to -1 by default (disabled); instead the number of reduce tasks is dynamically calculated based on Hive data statistics. Setting this to a constant value sets the same number of partitions for all Spark shuffle stages.# 重点读这句话, 默认-1是禁用,并且是根据hive的数据量动态计算reduce的个数the number of reduce tasks is dynamically calculated based on Hive data statistics.# chd5.16.2 hive环境hive (default)> set mapreduce.job.reduces;mapreduce.job.reduces=-1

3.3 通过源码分析 hive是如何通过方式一 动态计算reduce的个数的

 在org.apache.hadoop.hive.ql.exec.mr包下的 MapRedTask类中//方法类调用逻辑MapRedTask | ----setNumberOfReducers | ---- estimateNumberOfReducers|---- estimateReducers

(1)核心方法setNumberOfReducers

  /*** Set the number of reducers for the mapred work.*/private void setNumberOfReducers() throws IOException {ReduceWork rWork = work.getReduceWork();// this is a temporary hack to fix things that are not fixed in the compiler// 获取通过外部传参设置reduce数量的值 rWork.getNumReduceTasks() Integer numReducersFromWork = rWork == null ? 0 : rWork.getNumReduceTasks();if (rWork == null) {console.printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");} else {if (numReducersFromWork >= 0) {//如果手动设置了reduce的数量 大于等于0 ,则进来,控制台打印日志console.printInfo("Number of reduce tasks determined at compile time: "+ rWork.getNumReduceTasks());} else if (job.getNumReduceTasks() > 0) {//如果手动设置了reduce的数量,获取配置中的值,并传入到work中int reducers = job.getNumReduceTasks();rWork.setNumReduceTasks(reducers);console.printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "+ reducers);} else {//如果没有手动设置reduce的数量,进入方法if (inputSummary == null) {inputSummary =  Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);}//【重中之中】estimateNumberOfReducers int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),work.isFinalMapRed());rWork.setNumReduceTasks(reducers);console.printInfo("Number of reduce tasks not specified. Estimated from input data size: "+ reducers);}//hive shell中所看到的控制台打印日志就在这里console.printInfo("In order to change the average load for a reducer (in bytes):");console.printInfo("  set " + HiveConf.ConfVars.BYTESPERREDUCER.varname+ "=<number>");console.printInfo("In order to limit the maximum number of reducers:");console.printInfo("  set " + HiveConf.ConfVars.MAXREDUCERS.varname+ "=<number>");console.printInfo("In order to set a constant number of reducers:");console.printInfo("  set " + HiveConf.ConfVars.HADOOPNUMREDUCERS+ "=<number>");}}

(2)如果没有手动设置reduce的个数,hive是如何动态计算reduce个数的?

    int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),work.isFinalMapRed());/*** Estimate the number of reducers needed for this job, based on job input,* and configuration parameters.** The output of this method should only be used if the output of this* MapRedTask is not being used to populate a bucketed table and the user* has not specified the number of reducers to use.** @return the number of reducers.*/public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSummary,MapWork work, boolean finalMapRed) throws IOException {// bytesPerReducer默认值为256M  BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", 256000000L)long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);//maxReducers的默认值1009 MAXREDUCERS("hive.exec.reducers.max", 1009)int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);//对totalInputFileSize的计算double samplePercentage = getHighestSamplePercentage(work);long totalInputFileSize = getTotalInputFileSize(inputSummary, work, samplePercentage);// if all inputs are sampled, we should shrink the size of reducers accordingly.if (totalInputFileSize != inputSummary.getLength()) {LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="+ maxReducers + " estimated totalInputFileSize=" + totalInputFileSize);} else {LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="+ maxReducers + " totalInputFileSize=" + totalInputFileSize);}// If this map reduce job writes final data to a table and bucketing is being inferred,// and the user has configured Hive to do this, make sure the number of reducers is a// power of twoboolean powersOfTwo = conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) &&finalMapRed && !work.getBucketedColsByDirectory().isEmpty();//【真正计算reduce个数的方法】看源码的技巧return的方法是重要核心方法return estimateReducers(totalInputFileSize, bytesPerReducer, maxReducers, powersOfTwo);}

(3) 计算reduce个数的方法 estimateReducers

 public static int estimateReducers(long totalInputFileSize, long bytesPerReducer,int maxReducers, boolean powersOfTwo) {// 假设totalInputFileSize 1000M// bytes=Math.max(1024M,256M)=1000Mdouble bytes = Math.max(totalInputFileSize, bytesPerReducer);//reducers=(int)Math.ceil(1000M/256M)=4  此公式说明如果totalInputFileSize 小于256M ,则reducers=1 ;繁殖 则通过int reducers = (int) Math.ceil(bytes / bytesPerReducer);//Math.max(1, 4)=4 ,reducers的结果还是4reducers = Math.max(1, reducers);//Math.min(1009,4)=4; reducers的结果还是4reducers = Math.min(maxReducers, reducers);int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1;int reducersPowerTwo = (int)Math.pow(2, reducersLog);if (powersOfTwo) {// If the original number of reducers was a power of two, use thatif (reducersPowerTwo / 2 == reducers) {// nothing to do} else if (reducersPowerTwo > maxReducers) {// If the next power of two greater than the original number of reducers is greater// than the max number of reducers, use the preceding power of two, which is strictly// less than the original number of reducers and hence the maxreducers = reducersPowerTwo / 2;} else {// Otherwise use the smallest power of two greater than the original number of reducersreducers = reducersPowerTwo;}}return reducers;}

4.如何调整reduceTask的数量

    调整hive的reduce个数的两种方法:

4.1.hive.exec.reducers.bytes.per.reducer与hive.exec.reducers.max

      a.解释:在生产中,一般不调整这两个参数,这两个参数是 如果我们不指定hive的reduce个数,hive程序通过上面两个参数进行动态计算 决定reduce的个数。b.生产一般不调整

4.2.mapreduce.job.reduces

      a.解释: 一般在生产中对reduce的个数也不做太多调整,但是有时候reduce的个数太多,hdfs上的小文件太多。 此时就可以通过 调小mapreduce.job.reduces的个数,来减少hdfs上输出文件的个数。b.生产手动调整reduce个数,使用此参数c.案例  可以看到number of reducers的个数为5hive (default)>set mapreduce.job.reduces=5;hive (default)> select * from empt sort by length(ename);Query ID = root_20200725161515_730c6c65-9945-4cec-bbaa-e284bcdbb3ceTotal jobs = 1Launching Job 1 out of 1Number of reduce tasks not specified. Defaulting to jobconf value of: 5In order to change the average load for a reducer (in bytes):set hive.exec.reducers.bytes.per.reducer=<number>In order to limit the maximum number of reducers:set hive.exec.reducers.max=<number>In order to set a constant number of reducers:set mapreduce.job.reduces=<number>Starting Job = job_1594965583131_0089, Tracking URL = http://hadoop:7776/proxy/application_1594965583131_0089/Kill Command = /hadoop/hadoop/bin/hadoop job  -kill job_1594965583131_0089Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 5

4.3 设置reduce数量无效的情况

4.3.1 order by

  sql中使用了order by ,由于order by是全局排序,只能在一个reduce中完成,无论怎么调整reduce的数量都是无效的。hive (default)>set mapreduce.job.reduces=5;hive (default)> select * from empt order  by length(ename);Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1

4.3.2 笛卡尔积

可以看到此案例中的笛卡尔积不走reduce,无论设置多少个都无效。其实本质原因是 数据量比较小,hive的0.10以上版本都是将Mapjoin开启的。hive (default)>set mapreduce.job.reduces=5;hive (default)> select a.*,b.* from empt a join dept b;Warning: Map Join MAPJOIN[7][bigTable=a] in task 'Stage-3:MAPRED' is a         cross productHadoop job information for Stage-3: number of mappers: 1; number of reducers: 0关闭掉hive-site.xml中的mapJoin参数<property><name>hive.auto.convert.join</name><value>false</value><description>开启MapJoin功能</description></property>再次执行,可以看到日志,笛卡尔积其实也是全局聚合,只能够一个reduce来处理。即使设置了5个reduce也没有效果。hive (default)> set mapreduce.job.reduces=5;hive (default)> select a.*,b.* from empt a join dept b;Warning: Map Join MAPJOIN[16][bigTable=?] in task 'Stage-    3:MAPRED' is a cross product
Warning: Shuffle Join JOIN[4][tables = [a, b]] in Stage 'Stage-1:MAPRED' is a  cross product
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1

4.3.3 map端输出的数据量很小

计算reduce个数的方法 estimateReducers中有这三行代码:

     int reducers = (int) Math.ceil(bytes / bytesPerReducer);reducers = Math.max(1, reducers);reducers = Math.min(maxReducers, reducers);

如果map端输出的数据量(假如只有1M)小于hive.exec.reducers.bytes.per.reducer(default:256M)参数值,maxReducers默认为1009,可以计算

  int reducers  = (int) Math.ceil(1 / 256M)=1;reducers = Math.max(1, 1)=1;reducers = Math.min(1009, 1)=1;

所以此时即使你set mapreduce.job.reduces=10是没用的,最后还是只有1个。

Hive mapreduce的map与reduce个数由什么决定?相关推荐

  1. 运行wordcount的时候显示INFO mapreduce.Job: map 0% reduce 0%

    错误提示: [xiaoqiu@s150 /home/xiaoqiu]$ hadoop jar wordcounter.jar com.cr.wordcount.WordcountApp hdfs:// ...

  2. 使用sqoop将数据从hdfs中导入mysql时,卡在INFO mapreduce.Job: map 100% reduce 0%的解决办法

    最近在将hdfs中已经处理好的数据导入到mysql的时候遇到这么一个现象,当任务执行到 INFO mapreduce.Job: map 100% reduce 0% mapreduce任务卡在map1 ...

  3. map-reduce 、map、reduce

    2019独角兽企业重金招聘Python工程师标准>>> map-reduce 过程 中间绿线区域就是shuffle("洗牌")过程:map之后,reduce之前的 ...

  4. Hive 设置map 和 reduce 的个数

    一.    控制hive任务中的map数: 1.    通常情况下,作业会通过input的目录产生一个或者多个map任务.  主要的决定因素有: input的文件总个数,input的文件大小,集群设置 ...

  5. Hive 任务卡在 map = 0%, reduce = 0%

    Hive 卡在map = 0%, reduce = 0%阶段 解决:增加map个数,设置mapreduce.input.fileinputformat.split.maxsize 小于系统默认值,需要 ...

  6. 彻底明白Hadoop map和reduce的个数决定因素

    Hadoop map和reduce的个数设置,困扰了很多学习Hadoop的成员,为什么设置了配置参数就是不生效那?Hadoop Map和Reduce个数,到底跟什么有关系.首先他的参数很多,而且可能随 ...

  7. MapReduce Map数 reduce数设置

    JobConf.setNumMapTasks(n)是有意义的,结合block size会具体影响到map任务的个数,详见FileInputFormat.getSplits源码.假设没有设置mapred ...

  8. 关于hive中的reduce个数的设置。

    我们都知道在进行hive的查询的时候,设置合理的reduce个数能够使计算的速度加快. 具体的提高速度的方法有下面这些: (1)   hive.exec.reducers.bytes.per.redu ...

  9. MapReduce剖析笔记之五:Map与Reduce任务分配过程

    转载:https://www.cnblogs.com/esingchan/p/3940565.html 在上一节分析了TaskTracker和JobTracker之间通过周期的心跳消息获取任务分配结果 ...

最新文章

  1. nginx LB服务器配置
  2. JS原生封装时间函数 日期格式过滤
  3. python xml添加命名空间_XML的命名空间与python解析方法
  4. gns3中两个路由器分别连接主机然后分析ip数据转发报文arp协议_ARP协议在同网段及跨网段下的工作原理...
  5. HBuilder Android真机调试
  6. 暑期训练日志----2018.8.10
  7. sqlmap安装(python2或python3都行)
  8. Go语言中rune方法如何使用
  9. C#_根据银行卡卡号判断银行名称
  10. 靶机渗透练习81-Momentum:2
  11. 数据中心机房与机柜理线方法介绍
  12. 镜像搬运工 skopeo
  13. java+OpenCV3 +百度OCR(或tesseract) 识别表格数据
  14. 虚拟化技术(一)——虚拟化简介
  15. 中国知名食品品牌策划包装设计,哪家实力最强
  16. 穿(string类的运用)
  17. VS2019/MFC编程入门——文档、视图和框架:分割窗口
  18. 一步步学习微软InfoPath2010和SP2010--第四章节--处理SP列表表单(4)--已计算值域
  19. 专升本英语——五种基本句型、六种从句、两种语态、九种时态、三单
  20. 用友t6服务器设置映射,能否自定义用友T6 ERP-接口字段映射设置?

热门文章

  1. NYOJ:458-小光棍数
  2. 阿里云是干什么用的?针对新手用户的详细解答...
  3. Linux 运行vcs仿真命令,VCS使用以及命令行调试
  4. NLP - 微信好友个性签名情感分析( 基于Python开源库snownlp )
  5. 百度贴吧里见到的一道题
  6. 经典的shell十三问
  7. 代码技巧1.类似于登录、注册界面要判断登录账号是不是空,验证码是否正确等,怎么写比较舒服一点?
  8. android官网被封掉了,只好用这个网站进谷歌了!嘎嘎
  9. LED背光驱动IC 支持32通道 PIN艾瓦特7039,7088
  10. android解锁界面分析,Android 7.0 锁屏解锁之向上滑动显示解锁界面分析