一 MapTask个数的决定因素

首先,我们需要明确以下几点:

1Map Task个数不能通过配置文件指定

2Map Task个数是在进行文件的切分时动态计算的

3FileInputFormat负责切分文件进行split操作

1.1分析源码:

intmaps = writeSplits(job, submitJobDir);

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,

Path jobSubmitDir) throws IOException,

InterruptedException,ClassNotFoundException {

JobConf jConf = (JobConf)job.getConfiguration();

int maps;

//判断是否采用新的API,现在我们应该都是新的

if (jConf.getUseNewMapper()) {

maps = writeNewSplits(job, jobSubmitDir);

} else {

maps = writeOldSplits(jConf, jobSubmitDir);

}

return maps;

}

private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,

InterruptedException,ClassNotFoundException {

Configuration conf = job.getConfiguration();

//创建FileInputFormat

InputFormat<?, ?> input =

ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

//调用FileInputFormat#getSplits

List<InputSplit> splits = input.getSplits(job);

T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

//对split数组元素进行排序,最大的是第一个

Arrays.sort(array, new SplitComparator());

//创建Split文件,这些个文件会存在提交路径的临时目录

JobSplitWriter.createSplitFiles(jobSubmitDir, conf,

jobSubmitDir.getFileSystem(conf), array);

return array.length;

}

public List<InputSplit> getSplits(JobContext job) throws IOException {

StopWatch sw = new StopWatch().start();

//根据mapreduce.input.fileinputformat.split.minsize配置和1取最大的

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

//根据mapreduce.input.fileinputformat.split.maxsize取最大的

long maxSize = getMaxSplitSize(job);

// generate splits

List<InputSplit> splits = new ArrayList<InputSplit>();

List<FileStatus> files = listStatus(job);

for (FileStatus file: files) {

//获取文件路径

Path path = file.getPath();

//获取文件大小

long length = file.getLen();

if (length != 0) {

BlockLocation[] blkLocations;

//从本地获取文件数据块位置

if (file instanceof LocatedFileStatus) {

blkLocations = ((LocatedFileStatus) file).getBlockLocations();

} else {//非本地文件,远程调用获取文件数据块信息

FileSystem fs = path.getFileSystem(job.getConfiguration());

blkLocations = fs.getFileBlockLocations(file, 0, length);

}

if (isSplitable(job, path)) {

//获取文件数据块大小,默认128M

long blockSize = file.getBlockSize();

//计算InputSplit大小

long splitSize = computeSplitSize(blockSize, minSize, maxSize);

//将bytesRemaining(剩余未分片字节数)设置为整个文件的长度

long bytesRemaining = length;

/*

* 若剩余值bytesRemaining > 1.1 * splitSize,则继续对文件进行逻辑切分

* 若小于这个值,则作为一个InputSplit

*/

while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {

//计算文件的数据块的索引,只是计算InputSplit的起始位置是否位于某一块中

int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

//然后将计算的索引位置作为参数计算切分的split文件,然后添加到split数组

splits.add(makeSplit(path, length-bytesRemaining, splitSize,

blkLocations[blkIndex].getHosts(),

blkLocations[blkIndex].getCachedHosts()));

/*

* 剩余字节数-splitSize,相当于下一次从这儿开始计算

* 我们也可以推断出起始位置为0,splitSize,2*splitSize,3*splitSize 等

*/

bytesRemaining -= splitSize;

}

//如果block中剩下的一小段数据量小于splitSize,还是认为它是独立的分片

if (bytesRemaining != 0) {

int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,

blkLocations[blkIndex].getHosts(),

blkLocations[blkIndex].getCachedHosts()));

}

} else { // not splitable

splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),

blkLocations[0].getCachedHosts()));

}

} else {

//Create empty hosts array for zero lengthfiles

splits.add(makeSplit(path, 0, length, new String[0]));

}

}

// Save the number of input files for metrics/loadgen

//设置mapreduce.input.fileinputformat.numinputfile值为输入文件数量

job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());

sw.stop();

if (LOG.isDebugEnabled()) {

LOG.debug("Total# of splits generated by getSplits: " + splits.size()

+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));

}

return splits;

}

public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,

Configuration conf, FileSystem fs, T[] splits)

throws IOException, InterruptedException {

/*

* 创建切片文件,并获取FSDataOutputStream对应路径jobSubmitDir

* 届时就会生成${jobSubmitDir}/job.split文件

* jobSubmitDir:参数yarn.app.mapreduce.am.staging-dir

* 指定的路径

*/

FSDataOutputStream out = createFile(fs,

JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);

//将切片数据写入切片文件,并得到切片元数据信息数组

SplitMetaInfo[] info = writeNewSplits(conf, splits, out);

out.close();

//将切片元数据信息写入切片元数据信息文件

writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),

new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,

info);

}

private static FSDataOutputStream createFile(FileSystem fs, Path splitFile,

Configuration job)  throws IOException {

FSDataOutputStream out = FileSystem.create(fs, splitFile,

new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

//获取副本数,默认是10

int replication = job.getInt(Job.SUBMIT_REPLICATION, 10);

fs.setReplication(splitFile, (short)replication);

//写入切片头信息

writeSplitHeader(out);

return out;

}

#遍历输入的文件

#获取文件数据块的位置以及文件数据块的大小(默认128m)

#计算分片的尺寸大小splitSize

#对文件数据块进行分片

#创建切片文件,写入头信息,文件位置位于提交job的路径

#将分片信息写入分片文件,并将得到的切片元数据信息写入切片元数据信息文件

1.2Map任务的决定因素

我们知道,map的个数是intmaps = writeSplits(job, submitJobDir);

这里产生的,也就是取决于切片数量。

那么切片数量又是由什么决定的呢?

>如果splitSize== blockSize(128M),那么只有一个切片

也就是一个Map 任务

>如果minSize超过blockSize,那么根据计算splitSize算法,会取128M和minSize中最大的,所以会减少分片数量,也就是会减少MapTask数量

>如果maxSize< blockSize,那么会选择之间比较小的然后跟minSize比较取较大者,那么这样这会增加分片数量,从而增加Map Task

总结:决定因素

# mapreduce.input.fileinputformat.split.minsize

# mapreduce.input.fileinputformat.split.maxsize

# blockSize

二 ReduceTask的决定因素

reduce在运行时往往需要从相关map端复制数据到reduce节点来处理,因此相比于map任务。reduce节点资源是相对比较缺少的,同时相对运行较慢,正确的reduce任务的个数应该是0.95或者1.75 *(节点数* mapred.tasktracker.tasks.maximum参数值)。如果任务数是节点个数的0.95倍,那么所有的reduce任务能够在 map任务的输出传输结束后同时开始运行。如果任务数是节点个数的1.75倍,那么高速的节点会在完成他们第一批reduce任务计算之后开始计算第二批 reduce任务,这样的情况更有利于负载均衡。同时需要注意增加reduce的数量虽然会增加系统的资源开销,但是可以改善负载匀衡,降低任务失败带来的负面影响。同样,Reduce任务也能够与 map任务一样,通过设定JobConf的conf.setNumReduceTasks(intnum)方法来增加任务个数。

map任务和reduce任务个数如何计算相关推荐

  1. 彻底明白Hadoop map和reduce的个数决定因素

    Hadoop map和reduce的个数设置,困扰了很多学习Hadoop的成员,为什么设置了配置参数就是不生效那?Hadoop Map和Reduce个数,到底跟什么有关系.首先他的参数很多,而且可能随 ...

  2. 初学者python笔记(map()函数、reduce()函数、filter()函数、匿名函数)

    文章目录 一.匿名函数 二.map()函数 三.reduce()函数 四.filter()函数 五.三大函数总结 本篇文章内容有Python中的匿名函数和map()函数.reduce()函数.filt ...

  3. hive如何确定map数量和reduce数量?

    因为Hive底层就是MR,所以问题实际是MR如何确定map数量和reduce数量. map数量 map数量 逻辑如下 map数量=split数量 split数量=文件大小/split size spl ...

  4. Python中的map()函数和reduce()函数的用法

    Python中的map()函数和reduce()函数的用法 这篇文章主要介绍了Python中的map()函数和reduce()函数的用法,代码基于Python2.x版本,需要的朋友可以参考下  

  5. hadoop 分片与分块,map task和reduce task的理解

    分块:Block HDFS存储系统中,引入了文件系统的分块概念(block),块是存储的最小单位,HDFS定义其大小为64MB.与单磁盘文件系统相似,存储在 HDFS上的文件均存储为多个块,不同的是, ...

  6. python中的map,feilter,和reduce函数

    python中的map,feilter,和reduce函数 map() map()的原型是map(function, iterable, -) 参数 function: 传的是一个函数名,可以是pyt ...

  7. Python Map, Filter and Reduce

    所属网站分类: python基础 > 函数 作者:慧雅 原文链接: http://www.pythonheidong.com/blog/article/21/ 来源:python黑洞网 www. ...

  8. CNN中feature map、卷积核、卷积核个数、filter、channel的概念解释,以及CNN 学习过程中卷积核更新的理解

    feature map.卷积核.卷积核个数.filter.channel的概念解释 feather map的理解 在cnn的每个卷积层,数据都是以三维形式存在的.你可以把它看成许多个二维图片叠在一起( ...

  9. Hive 设置map 和 reduce 的个数

    一.    控制hive任务中的map数: 1.    通常情况下,作业会通过input的目录产生一个或者多个map任务.  主要的决定因素有: input的文件总个数,input的文件大小,集群设置 ...

最新文章

  1. 6_程序员最常用的快捷键的都在这里啦 (哈哈,我不是)(20181208)
  2. ui设计和python哪个容易学_软件开发和ui设计那个容易学?
  3. set集合判断集合中是否有无元素_集合 (Set) | 一山不容二虎的 Python 数据类型
  4. 百度搜索引擎优化指南3.0_深圳网站搜索引擎排名优化电话,百度优化排名费用_华阳网络...
  5. 《恋上数据结构第1季》二叉树代码实现
  6. Android Thing专题5 I2C
  7. 计算机基础(二):嵌入式驱动、图像处理知识设备小结
  8. @Transactional注解属性(1)
  9. html amp css设计与构建网站,HTMLCSS设计与构建网站 笔记CSS
  10. 微信公众平台开发进阶篇资源集锦
  11. 【洋哥聊运营】5点讲透增长
  12. java学科竞赛管理系统_《高校学科竞赛管理系统的web前端设计与实现》文献阅读随笔...
  13. 传感器 动态误差计算
  14. 光耦的介绍和常用参数
  15. java wait until_java调用ktr文件trans.waitUntilFinished()超时
  16. 内网通过映射后的公网IP访问内网服务测试--ASA842 hairpin NAT测试
  17. 文件指纹修改工具 Hash Modifier
  18. 判断素数的方法(孪生素数)
  19. 基于知识图谱的智能问答
  20. python 读文件 如何从第二行开始

热门文章

  1. 相量除法能用计算机吗,电路相量的加减乘除运算
  2. cadence快捷键修改文件_PCB快捷键设置
  3. android实践练习_android 练习之路 (四)
  4. 作为技术人员,经常遇到没有接触过的技术,有时是点滴的小技能,有时可能是大的一个研究课题,那么我们如何进行技术研究呢?
  5. ax.spines——matplotlib坐标轴设置
  6. oracle system表空间有坏块,修复系统表空间坏块
  7. mysql 错误代码 0_Linux平台MySQL5InnoDB系统错误代码0
  8. ext Grid(三)
  9. terminal登录mysql_转载-MySQL之终端(Terminal)管理MySQL
  10. 安装deepin_deepin使用笔记,安装steam客户端