Hive mapreduce的map与reduce个数由什么决定?
文章目录
- 1.MapTask的数量决定
- 2.如何来调整MapTask的数量
- 2.1 增加map的数量 : 调小maxsize (要小于blockSize才有效,比如100byte)
- 2.2 减少map的数量 : 调大minSize (要大于blockSize才有效,比如250M)
- 2.3生产中一般不调整,但是要知道原理。
- 3. ReduceTask的数量决定
- 3.1 在执行hive shell的时候可以看到下列日志
- 3.2 官网对这三个参数的解释
- 3.3 通过源码分析 hive是如何通过方式一 动态计算reduce的个数的
- 4.如何调整reduceTask的数量
- 4.1.hive.exec.reducers.bytes.per.reducer与hive.exec.reducers.max
- 4.2.mapreduce.job.reduces
- 4.3 设置reduce数量无效的情况
- 4.3.1 order by
- 4.3.2 笛卡尔积
- 4.3.3 map端输出的数据量很小
1.MapTask的数量决定
(1)文件的个数 (2)文件大小 (3)blocksize
简单来说:输入的目录中文件的数量决定多少个map会被运行起来,应用针对每一个分片运行一个map,一般而言,对于每一个输入的文件会有一个map split。如果输入文件太大,超过了hdfs块的大小(128M)那么对于同一个输入文件我们会有多余2个的map运行起来
mapTask的数量由文件决定的数学含义:
MapReduce的每一个map处理数据是不能跨越文件的。也就是说minMapNum>=inputFileNum,所以,最终的map个数应该为:
mapNum=max(computeMapNum,inputFileNum)
下面讲述文件大小和blockSize是如果影响mapTask的数量的。
2.如何来调整MapTask的数量
在不改变blockSize的情况下,变更map的情况
计算splitSize的公式:
//下面的调整都是按照此公式进行调整(具体解释看我的另外一篇博客 https://blog.csdn.net/lihuazaizheli/article/details/107370695)splitSize=Min(maxSize,Max(minSize,blockSize))当 minSize《 blockSize 《 maxSize 时,splitSize=blockSize当 blockSize《 minSize《 maxSize 时,splitSize=minSize ,可应用于减少map数量,调大minSize(至少大于blockSize)当 minSize《 maxSize 《blockSize 时,splitSize=maxSize , 可应用于增加map数量,调小maxSize(至少小于blockSize)
如果开启了本地测试模式,需要set hive.exec.mode.local.auto=fase关闭才能够看出具的map与reduce个数的日志
重要结论:
(1)【重要】如果不进行任何设置,默认的map个数是和blcok_size相关的。
default_num = 输入文件的整体大小 / block_size;
(2)可以通过参数mapred.map.tasks来设置程序员期望的map个数,但是这个个数只有在大于default_num的时候,才会生效。
(3)一般是结合hive使用,在hive中的更加通用的配置(下文2.1中介绍的是fileinputformat的输入形式配置)
a.minSize相关的参数:
hive (default)> set mapred.min.split.size;
mapred.min.split.size=1
hive (default)> set mapred.min.split.size.per.node;
mapred.min.split.size.per.node=1
hive (default)> set mapred.min.split.size.per.rack;
mapred.min.split.size.per.rack=1
b.maxSize相关的参数
hive (default)> set mapred.max.split.size;
mapred.max.split.size=256000000
c.本地聚合参数
hive (default)> set hive.input.format;
hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat#官网解释
hive.input.format
Default Value: org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
Added In: Hive 0.5.0
The default input format. Set this to HiveInputFormat if you encounter problems with CombineHiveInputFormat.
Also see:
Configuration Properties#hive.tez.input.format
2.1 增加map的数量 : 调小maxsize (要小于blockSize才有效,比如100byte)
注意,下面的例子是fileinputformat格式的数据
方式一:
(1)maxsize的默认值为256Mhive (default)> set mapreduce.input.fileinputformat.split.maxsize;mapreduce.input.fileinputformat.split.maxsize=256000000mapreduce.input.fileinputformat.split.maxsize(2) 将次值调小为100字节,此时splitSize=100;set mapreduce.input.fileinputformat.split.maxsize=100;select count(1) from empt;可以看到日志number of mappers: 2; number of reducers: 1
方式二:
设置mapred.map.tasks 为一个较大的值(大于default_num=输入文件整体大小/splitSize)。
2.2 减少map的数量 : 调大minSize (要大于blockSize才有效,比如250M)
(1)minsize的默认值是1hive (default)> set mapreduce.input.fileinputformat.split.minsize;mapreduce.input.fileinputformat.split.minsize=1(2)调整minsize为256M,此时splitSize=256Mset mapreduce.input.fileinputformat.split.minsize=256000000;select count(1) from empt;
2.3生产中一般不调整,但是要知道原理。
一个文件有2列数据,有128M, 默认blockSize为128M的情况下,只有1个map,这个时候可以将map数量调大,以更快的处理数据。
3. ReduceTask的数量决定
reduce个数决定文件的输出个数(1)决定方式一 后面看源码解析 hive.exec.reducers.bytes.per.reducer 参数1(default:256M)hive.exec.reducers.max 参数2 (default:1009)min(参数2, 总数据量/参数1)一般参数2的值不变动,在普通集群规模下,hive根据数据量自动决定reduce的个数为: 输入总数据量/hive.exec.reducers.bytes.per.reducer(2)决定方式二set mapreduce.job.reduces=1;
3.1 在执行hive shell的时候可以看到下列日志
Number of reduce tasks determined at compile time: 1In order to change the average load for a reducer (in bytes):set hive.exec.reducers.bytes.per.reducer=<number>In order to limit the maximum number of reducers:set hive.exec.reducers.max=<number>In order to set a constant number of reducers:set mapreduce.job.reduces=<number>这三个参数决定了reduce的个数,下面详细解释
3.2 官网对这三个参数的解释
(1)hive.exec.reducers.bytes.per.reducer
Default Value: 1,000,000,000 prior to Hive 0.14.0; 256 MB (256,000,000) in Hive 0.14.0 and laterAdded In: Hive 0.2.0; default changed in 0.14.0 with HIVE-7158 (and HIVE-7917)Size per reducer. The default in Hive 0.14.0 and earlier is 1 GB, that is, if the input size is 10 GB then 10 reducers will be used. # 重点看这句话In Hive 0.14.0 and later the default is 256 MB, that is, if the input size is 1 GB then 4 reducers will be used.# chd5.16.2 hive环境中是128Mhive (default)> set hive.exec.reducers.bytes.per.reducer;hive.exec.reducers.bytes.per.reducer=134217728
(2)hive.exec.reducers.max
Default Value: 999 prior to Hive 0.14.0; 1009 in Hive 0.14.0 and laterAdded In: Hive 0.2.0; default changed in 0.14.0 with HIVE-7158 (and HIVE-7917)Maximum number of reducers that will be used. If the one specified in the configuration property Configuration Properties#mapred.reduce.tasks is negative, Hive will use this as the maximum number of reducers when automatically determining the number of reducers# chd5.16.2 hive环境hive (default)> set hive.exec.reducers.max;hive.exec.reducers.max=500
(3)mapreduce.job.reduces
Default Value: -1 (disabled)Added in: Hive 1.1.0 with HIVE-7567Sets the number of reduce tasks for each Spark shuffle stage (e.g. the number of partitions when performing a Spark shuffle). This is set to -1 by default (disabled); instead the number of reduce tasks is dynamically calculated based on Hive data statistics. Setting this to a constant value sets the same number of partitions for all Spark shuffle stages.# 重点读这句话, 默认-1是禁用,并且是根据hive的数据量动态计算reduce的个数the number of reduce tasks is dynamically calculated based on Hive data statistics.# chd5.16.2 hive环境hive (default)> set mapreduce.job.reduces;mapreduce.job.reduces=-1
3.3 通过源码分析 hive是如何通过方式一 动态计算reduce的个数的
在org.apache.hadoop.hive.ql.exec.mr包下的 MapRedTask类中//方法类调用逻辑MapRedTask | ----setNumberOfReducers | ---- estimateNumberOfReducers|---- estimateReducers
(1)核心方法setNumberOfReducers
/*** Set the number of reducers for the mapred work.*/private void setNumberOfReducers() throws IOException {ReduceWork rWork = work.getReduceWork();// this is a temporary hack to fix things that are not fixed in the compiler// 获取通过外部传参设置reduce数量的值 rWork.getNumReduceTasks() Integer numReducersFromWork = rWork == null ? 0 : rWork.getNumReduceTasks();if (rWork == null) {console.printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");} else {if (numReducersFromWork >= 0) {//如果手动设置了reduce的数量 大于等于0 ,则进来,控制台打印日志console.printInfo("Number of reduce tasks determined at compile time: "+ rWork.getNumReduceTasks());} else if (job.getNumReduceTasks() > 0) {//如果手动设置了reduce的数量,获取配置中的值,并传入到work中int reducers = job.getNumReduceTasks();rWork.setNumReduceTasks(reducers);console.printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "+ reducers);} else {//如果没有手动设置reduce的数量,进入方法if (inputSummary == null) {inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);}//【重中之中】estimateNumberOfReducers int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),work.isFinalMapRed());rWork.setNumReduceTasks(reducers);console.printInfo("Number of reduce tasks not specified. Estimated from input data size: "+ reducers);}//hive shell中所看到的控制台打印日志就在这里console.printInfo("In order to change the average load for a reducer (in bytes):");console.printInfo(" set " + HiveConf.ConfVars.BYTESPERREDUCER.varname+ "=<number>");console.printInfo("In order to limit the maximum number of reducers:");console.printInfo(" set " + HiveConf.ConfVars.MAXREDUCERS.varname+ "=<number>");console.printInfo("In order to set a constant number of reducers:");console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS+ "=<number>");}}
(2)如果没有手动设置reduce的个数,hive是如何动态计算reduce个数的?
int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),work.isFinalMapRed());/*** Estimate the number of reducers needed for this job, based on job input,* and configuration parameters.** The output of this method should only be used if the output of this* MapRedTask is not being used to populate a bucketed table and the user* has not specified the number of reducers to use.** @return the number of reducers.*/public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSummary,MapWork work, boolean finalMapRed) throws IOException {// bytesPerReducer默认值为256M BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", 256000000L)long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);//maxReducers的默认值1009 MAXREDUCERS("hive.exec.reducers.max", 1009)int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);//对totalInputFileSize的计算double samplePercentage = getHighestSamplePercentage(work);long totalInputFileSize = getTotalInputFileSize(inputSummary, work, samplePercentage);// if all inputs are sampled, we should shrink the size of reducers accordingly.if (totalInputFileSize != inputSummary.getLength()) {LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="+ maxReducers + " estimated totalInputFileSize=" + totalInputFileSize);} else {LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="+ maxReducers + " totalInputFileSize=" + totalInputFileSize);}// If this map reduce job writes final data to a table and bucketing is being inferred,// and the user has configured Hive to do this, make sure the number of reducers is a// power of twoboolean powersOfTwo = conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) &&finalMapRed && !work.getBucketedColsByDirectory().isEmpty();//【真正计算reduce个数的方法】看源码的技巧return的方法是重要核心方法return estimateReducers(totalInputFileSize, bytesPerReducer, maxReducers, powersOfTwo);}
(3) 计算reduce个数的方法 estimateReducers
public static int estimateReducers(long totalInputFileSize, long bytesPerReducer,int maxReducers, boolean powersOfTwo) {// 假设totalInputFileSize 1000M// bytes=Math.max(1024M,256M)=1000Mdouble bytes = Math.max(totalInputFileSize, bytesPerReducer);//reducers=(int)Math.ceil(1000M/256M)=4 此公式说明如果totalInputFileSize 小于256M ,则reducers=1 ;繁殖 则通过int reducers = (int) Math.ceil(bytes / bytesPerReducer);//Math.max(1, 4)=4 ,reducers的结果还是4reducers = Math.max(1, reducers);//Math.min(1009,4)=4; reducers的结果还是4reducers = Math.min(maxReducers, reducers);int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1;int reducersPowerTwo = (int)Math.pow(2, reducersLog);if (powersOfTwo) {// If the original number of reducers was a power of two, use thatif (reducersPowerTwo / 2 == reducers) {// nothing to do} else if (reducersPowerTwo > maxReducers) {// If the next power of two greater than the original number of reducers is greater// than the max number of reducers, use the preceding power of two, which is strictly// less than the original number of reducers and hence the maxreducers = reducersPowerTwo / 2;} else {// Otherwise use the smallest power of two greater than the original number of reducersreducers = reducersPowerTwo;}}return reducers;}
4.如何调整reduceTask的数量
调整hive的reduce个数的两种方法:
4.1.hive.exec.reducers.bytes.per.reducer与hive.exec.reducers.max
a.解释:在生产中,一般不调整这两个参数,这两个参数是 如果我们不指定hive的reduce个数,hive程序通过上面两个参数进行动态计算 决定reduce的个数。b.生产一般不调整
4.2.mapreduce.job.reduces
a.解释: 一般在生产中对reduce的个数也不做太多调整,但是有时候reduce的个数太多,hdfs上的小文件太多。 此时就可以通过 调小mapreduce.job.reduces的个数,来减少hdfs上输出文件的个数。b.生产手动调整reduce个数,使用此参数c.案例 可以看到number of reducers的个数为5hive (default)>set mapreduce.job.reduces=5;hive (default)> select * from empt sort by length(ename);Query ID = root_20200725161515_730c6c65-9945-4cec-bbaa-e284bcdbb3ceTotal jobs = 1Launching Job 1 out of 1Number of reduce tasks not specified. Defaulting to jobconf value of: 5In order to change the average load for a reducer (in bytes):set hive.exec.reducers.bytes.per.reducer=<number>In order to limit the maximum number of reducers:set hive.exec.reducers.max=<number>In order to set a constant number of reducers:set mapreduce.job.reduces=<number>Starting Job = job_1594965583131_0089, Tracking URL = http://hadoop:7776/proxy/application_1594965583131_0089/Kill Command = /hadoop/hadoop/bin/hadoop job -kill job_1594965583131_0089Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 5
4.3 设置reduce数量无效的情况
4.3.1 order by
sql中使用了order by ,由于order by是全局排序,只能在一个reduce中完成,无论怎么调整reduce的数量都是无效的。hive (default)>set mapreduce.job.reduces=5;hive (default)> select * from empt order by length(ename);Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
4.3.2 笛卡尔积
可以看到此案例中的笛卡尔积不走reduce,无论设置多少个都无效。其实本质原因是 数据量比较小,hive的0.10以上版本都是将Mapjoin开启的。hive (default)>set mapreduce.job.reduces=5;hive (default)> select a.*,b.* from empt a join dept b;Warning: Map Join MAPJOIN[7][bigTable=a] in task 'Stage-3:MAPRED' is a cross productHadoop job information for Stage-3: number of mappers: 1; number of reducers: 0关闭掉hive-site.xml中的mapJoin参数<property><name>hive.auto.convert.join</name><value>false</value><description>开启MapJoin功能</description></property>再次执行,可以看到日志,笛卡尔积其实也是全局聚合,只能够一个reduce来处理。即使设置了5个reduce也没有效果。hive (default)> set mapreduce.job.reduces=5;hive (default)> select a.*,b.* from empt a join dept b;Warning: Map Join MAPJOIN[16][bigTable=?] in task 'Stage- 3:MAPRED' is a cross product
Warning: Shuffle Join JOIN[4][tables = [a, b]] in Stage 'Stage-1:MAPRED' is a cross product
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
4.3.3 map端输出的数据量很小
计算reduce个数的方法 estimateReducers中有这三行代码:
int reducers = (int) Math.ceil(bytes / bytesPerReducer);reducers = Math.max(1, reducers);reducers = Math.min(maxReducers, reducers);
如果map端输出的数据量(假如只有1M)小于hive.exec.reducers.bytes.per.reducer(default:256M)参数值,maxReducers默认为1009,可以计算
int reducers = (int) Math.ceil(1 / 256M)=1;reducers = Math.max(1, 1)=1;reducers = Math.min(1009, 1)=1;
所以此时即使你set mapreduce.job.reduces=10是没用的,最后还是只有1个。
Hive mapreduce的map与reduce个数由什么决定?相关推荐
- 运行wordcount的时候显示INFO mapreduce.Job: map 0% reduce 0%
错误提示: [xiaoqiu@s150 /home/xiaoqiu]$ hadoop jar wordcounter.jar com.cr.wordcount.WordcountApp hdfs:// ...
- 使用sqoop将数据从hdfs中导入mysql时,卡在INFO mapreduce.Job: map 100% reduce 0%的解决办法
最近在将hdfs中已经处理好的数据导入到mysql的时候遇到这么一个现象,当任务执行到 INFO mapreduce.Job: map 100% reduce 0% mapreduce任务卡在map1 ...
- map-reduce 、map、reduce
2019独角兽企业重金招聘Python工程师标准>>> map-reduce 过程 中间绿线区域就是shuffle("洗牌")过程:map之后,reduce之前的 ...
- Hive 设置map 和 reduce 的个数
一. 控制hive任务中的map数: 1. 通常情况下,作业会通过input的目录产生一个或者多个map任务. 主要的决定因素有: input的文件总个数,input的文件大小,集群设置 ...
- Hive 任务卡在 map = 0%, reduce = 0%
Hive 卡在map = 0%, reduce = 0%阶段 解决:增加map个数,设置mapreduce.input.fileinputformat.split.maxsize 小于系统默认值,需要 ...
- 彻底明白Hadoop map和reduce的个数决定因素
Hadoop map和reduce的个数设置,困扰了很多学习Hadoop的成员,为什么设置了配置参数就是不生效那?Hadoop Map和Reduce个数,到底跟什么有关系.首先他的参数很多,而且可能随 ...
- MapReduce Map数 reduce数设置
JobConf.setNumMapTasks(n)是有意义的,结合block size会具体影响到map任务的个数,详见FileInputFormat.getSplits源码.假设没有设置mapred ...
- 关于hive中的reduce个数的设置。
我们都知道在进行hive的查询的时候,设置合理的reduce个数能够使计算的速度加快. 具体的提高速度的方法有下面这些: (1) hive.exec.reducers.bytes.per.redu ...
- MapReduce剖析笔记之五:Map与Reduce任务分配过程
转载:https://www.cnblogs.com/esingchan/p/3940565.html 在上一节分析了TaskTracker和JobTracker之间通过周期的心跳消息获取任务分配结果 ...
最新文章
- nginx LB服务器配置
- JS原生封装时间函数 日期格式过滤
- python xml添加命名空间_XML的命名空间与python解析方法
- gns3中两个路由器分别连接主机然后分析ip数据转发报文arp协议_ARP协议在同网段及跨网段下的工作原理...
- HBuilder Android真机调试
- 暑期训练日志----2018.8.10
- sqlmap安装(python2或python3都行)
- Go语言中rune方法如何使用
- C#_根据银行卡卡号判断银行名称
- 靶机渗透练习81-Momentum:2
- 数据中心机房与机柜理线方法介绍
- 镜像搬运工 skopeo
- java+OpenCV3 +百度OCR(或tesseract) 识别表格数据
- 虚拟化技术(一)——虚拟化简介
- 中国知名食品品牌策划包装设计,哪家实力最强
- 穿(string类的运用)
- VS2019/MFC编程入门——文档、视图和框架:分割窗口
- 一步步学习微软InfoPath2010和SP2010--第四章节--处理SP列表表单(4)--已计算值域
- 专升本英语——五种基本句型、六种从句、两种语态、九种时态、三单
- 用友t6服务器设置映射,能否自定义用友T6 ERP-接口字段映射设置?
热门文章
- NYOJ:458-小光棍数
- 阿里云是干什么用的?针对新手用户的详细解答...
- Linux 运行vcs仿真命令,VCS使用以及命令行调试
- NLP - 微信好友个性签名情感分析( 基于Python开源库snownlp )
- 百度贴吧里见到的一道题
- 经典的shell十三问
- 代码技巧1.类似于登录、注册界面要判断登录账号是不是空,验证码是否正确等,怎么写比较舒服一点?
- android官网被封掉了,只好用这个网站进谷歌了!嘎嘎
- LED背光驱动IC 支持32通道 PIN艾瓦特7039,7088
- android解锁界面分析,Android 7.0 锁屏解锁之向上滑动显示解锁界面分析