map任务和reduce任务个数如何计算
一 MapTask个数的决定因素
首先,我们需要明确以下几点:
1Map Task个数不能通过配置文件指定
2Map Task个数是在进行文件的切分时动态计算的
3FileInputFormat负责切分文件进行split操作
1.1分析源码:
intmaps = writeSplits(job, submitJobDir);
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException,ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
//判断是否采用新的API,现在我们应该都是新的
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException,ClassNotFoundException {
Configuration conf = job.getConfiguration();
//创建FileInputFormat
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
//调用FileInputFormat#getSplits
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
//对split数组元素进行排序,最大的是第一个
Arrays.sort(array, new SplitComparator());
//创建Split文件,这些个文件会存在提交路径的临时目录
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
//根据mapreduce.input.fileinputformat.split.minsize配置和1取最大的
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//根据mapreduce.input.fileinputformat.split.maxsize取最大的
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
//获取文件路径
Path path = file.getPath();
//获取文件大小
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
//从本地获取文件数据块位置
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {//非本地文件,远程调用获取文件数据块信息
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
//获取文件数据块大小,默认128M
long blockSize = file.getBlockSize();
//计算InputSplit大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
//将bytesRemaining(剩余未分片字节数)设置为整个文件的长度
long bytesRemaining = length;
/*
* 若剩余值bytesRemaining > 1.1 * splitSize,则继续对文件进行逻辑切分
* 若小于这个值,则作为一个InputSplit
*/
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
//计算文件的数据块的索引,只是计算InputSplit的起始位置是否位于某一块中
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
//然后将计算的索引位置作为参数计算切分的split文件,然后添加到split数组
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
/*
* 剩余字节数-splitSize,相当于下一次从这儿开始计算
* 我们也可以推断出起始位置为0,splitSize,2*splitSize,3*splitSize 等
*/
bytesRemaining -= splitSize;
}
//如果block中剩下的一小段数据量小于splitSize,还是认为它是独立的分片
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero lengthfiles
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
//设置mapreduce.input.fileinputformat.numinputfile值为输入文件数量
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total# of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,
Configuration conf, FileSystem fs, T[] splits)
throws IOException, InterruptedException {
/*
* 创建切片文件,并获取FSDataOutputStream对应路径jobSubmitDir
* 届时就会生成${jobSubmitDir}/job.split文件
* jobSubmitDir:参数yarn.app.mapreduce.am.staging-dir
* 指定的路径
*/
FSDataOutputStream out = createFile(fs,
JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
//将切片数据写入切片文件,并得到切片元数据信息数组
SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
out.close();
//将切片元数据信息写入切片元数据信息文件
writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
info);
}
private static FSDataOutputStream createFile(FileSystem fs, Path splitFile,
Configuration job) throws IOException {
FSDataOutputStream out = FileSystem.create(fs, splitFile,
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
//获取副本数,默认是10
int replication = job.getInt(Job.SUBMIT_REPLICATION, 10);
fs.setReplication(splitFile, (short)replication);
//写入切片头信息
writeSplitHeader(out);
return out;
}
#遍历输入的文件
#获取文件数据块的位置以及文件数据块的大小(默认128m)
#计算分片的尺寸大小splitSize
#对文件数据块进行分片
#创建切片文件,写入头信息,文件位置位于提交job的路径
#将分片信息写入分片文件,并将得到的切片元数据信息写入切片元数据信息文件
1.2Map任务的决定因素
我们知道,map的个数是intmaps = writeSplits(job, submitJobDir);
这里产生的,也就是取决于切片数量。
那么切片数量又是由什么决定的呢?
>如果splitSize== blockSize(128M),那么只有一个切片
也就是一个Map 任务
>如果minSize超过blockSize,那么根据计算splitSize算法,会取128M和minSize中最大的,所以会减少分片数量,也就是会减少MapTask数量
>如果maxSize< blockSize,那么会选择之间比较小的然后跟minSize比较取较大者,那么这样这会增加分片数量,从而增加Map Task
总结:决定因素
# mapreduce.input.fileinputformat.split.minsize
# mapreduce.input.fileinputformat.split.maxsize
# blockSize
二 ReduceTask的决定因素
reduce在运行时往往需要从相关map端复制数据到reduce节点来处理,因此相比于map任务。reduce节点资源是相对比较缺少的,同时相对运行较慢,正确的reduce任务的个数应该是0.95或者1.75 *(节点数* mapred.tasktracker.tasks.maximum参数值)。如果任务数是节点个数的0.95倍,那么所有的reduce任务能够在 map任务的输出传输结束后同时开始运行。如果任务数是节点个数的1.75倍,那么高速的节点会在完成他们第一批reduce任务计算之后开始计算第二批 reduce任务,这样的情况更有利于负载均衡。同时需要注意增加reduce的数量虽然会增加系统的资源开销,但是可以改善负载匀衡,降低任务失败带来的负面影响。同样,Reduce任务也能够与 map任务一样,通过设定JobConf的conf.setNumReduceTasks(intnum)方法来增加任务个数。
map任务和reduce任务个数如何计算相关推荐
- 彻底明白Hadoop map和reduce的个数决定因素
Hadoop map和reduce的个数设置,困扰了很多学习Hadoop的成员,为什么设置了配置参数就是不生效那?Hadoop Map和Reduce个数,到底跟什么有关系.首先他的参数很多,而且可能随 ...
- 初学者python笔记(map()函数、reduce()函数、filter()函数、匿名函数)
文章目录 一.匿名函数 二.map()函数 三.reduce()函数 四.filter()函数 五.三大函数总结 本篇文章内容有Python中的匿名函数和map()函数.reduce()函数.filt ...
- hive如何确定map数量和reduce数量?
因为Hive底层就是MR,所以问题实际是MR如何确定map数量和reduce数量. map数量 map数量 逻辑如下 map数量=split数量 split数量=文件大小/split size spl ...
- Python中的map()函数和reduce()函数的用法
Python中的map()函数和reduce()函数的用法 这篇文章主要介绍了Python中的map()函数和reduce()函数的用法,代码基于Python2.x版本,需要的朋友可以参考下
- hadoop 分片与分块,map task和reduce task的理解
分块:Block HDFS存储系统中,引入了文件系统的分块概念(block),块是存储的最小单位,HDFS定义其大小为64MB.与单磁盘文件系统相似,存储在 HDFS上的文件均存储为多个块,不同的是, ...
- python中的map,feilter,和reduce函数
python中的map,feilter,和reduce函数 map() map()的原型是map(function, iterable, -) 参数 function: 传的是一个函数名,可以是pyt ...
- Python Map, Filter and Reduce
所属网站分类: python基础 > 函数 作者:慧雅 原文链接: http://www.pythonheidong.com/blog/article/21/ 来源:python黑洞网 www. ...
- CNN中feature map、卷积核、卷积核个数、filter、channel的概念解释,以及CNN 学习过程中卷积核更新的理解
feature map.卷积核.卷积核个数.filter.channel的概念解释 feather map的理解 在cnn的每个卷积层,数据都是以三维形式存在的.你可以把它看成许多个二维图片叠在一起( ...
- Hive 设置map 和 reduce 的个数
一. 控制hive任务中的map数: 1. 通常情况下,作业会通过input的目录产生一个或者多个map任务. 主要的决定因素有: input的文件总个数,input的文件大小,集群设置 ...
最新文章
- 6_程序员最常用的快捷键的都在这里啦 (哈哈,我不是)(20181208)
- ui设计和python哪个容易学_软件开发和ui设计那个容易学?
- set集合判断集合中是否有无元素_集合 (Set) | 一山不容二虎的 Python 数据类型
- 百度搜索引擎优化指南3.0_深圳网站搜索引擎排名优化电话,百度优化排名费用_华阳网络...
- 《恋上数据结构第1季》二叉树代码实现
- Android Thing专题5 I2C
- 计算机基础(二):嵌入式驱动、图像处理知识设备小结
- @Transactional注解属性(1)
- html amp css设计与构建网站,HTMLCSS设计与构建网站 笔记CSS
- 微信公众平台开发进阶篇资源集锦
- 【洋哥聊运营】5点讲透增长
- java学科竞赛管理系统_《高校学科竞赛管理系统的web前端设计与实现》文献阅读随笔...
- 传感器 动态误差计算
- 光耦的介绍和常用参数
- java wait until_java调用ktr文件trans.waitUntilFinished()超时
- 内网通过映射后的公网IP访问内网服务测试--ASA842 hairpin NAT测试
- 文件指纹修改工具 Hash Modifier
- 判断素数的方法(孪生素数)
- 基于知识图谱的智能问答
- python 读文件 如何从第二行开始
热门文章
- 相量除法能用计算机吗,电路相量的加减乘除运算
- cadence快捷键修改文件_PCB快捷键设置
- android实践练习_android 练习之路 (四)
- 作为技术人员,经常遇到没有接触过的技术,有时是点滴的小技能,有时可能是大的一个研究课题,那么我们如何进行技术研究呢?
- ax.spines——matplotlib坐标轴设置
- oracle system表空间有坏块,修复系统表空间坏块
- mysql 错误代码 0_Linux平台MySQL5InnoDB系统错误代码0
- ext Grid(三)
- terminal登录mysql_转载-MySQL之终端(Terminal)管理MySQL
- 安装deepin_deepin使用笔记,安装steam客户端