mapTask并行度的决定机制

  一个job的map阶段并行度由客户端在提交job时决定,而客户端对map阶段并行度的规划的基本逻辑为:将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个mapTask并行实例处理。

FileInputFormat切片机制

原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/6733968.html

1、默认切片定义在InputFormat类中的getSplit()方法

2、FileInputFormat中默认的切片机制:

a) 简单地按照文件的内容长度进行切片

b) 切片大小,默认等于hdfs的block大小

c) 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

比如待处理数据有两个文件:

file1.txt    260M
file2.txt    10M

经过FileInputFormat的切片机制运算后,形成的切片信息如下:

file1.txt.split1--  0~128
file1.txt.split2--  128~260 //如果剩余的文件长度/切片长度<=1.1则会将剩余文件的长度并未一个切片
file2.txt.split1--  0~10M

3、FileInputFormat中切片的大小的参数配置

通过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由这几个值来运算决定。

minsize:默认值:1  配置参数: mapreduce.input.fileinputformat.split.minsize    maxsize:默认值:Long.MAXValue  配置参数:mapreduce.input.fileinputformat.split.maxsizeblocksize:值为hdfs的对应文件的blocksize

配置读取目录下文件数量的线程数:public static final String LIST_STATUS_NUM_THREADS =      "mapreduce.input.fileinputformat.list-status.num-threads";

因此,默认情况下,Math.max(minSize, Math.min(maxSize, blockSize));切片大小=blocksize

maxsize(切片最大值):参数如果调得比blocksize小,则会让切片变小。

minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blocksize还大。

选择并发数的影响因素:

1、运算节点的硬件配置

2、运算任务的类型:CPU密集型还是IO密集型

3、运算任务的数据量

3、hadoop2.6.4源码解析

org.apache.hadoop.mapreduce.JobSubmitter类

   //得到job的map任务的并行数量private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {JobConf jConf = (JobConf)job.getConfiguration();int maps;if (jConf.getUseNewMapper()) {maps = writeNewSplits(job, jobSubmitDir);} else {maps = writeOldSplits(jConf, jobSubmitDir);}return maps;}@SuppressWarnings("unchecked")private <T extends InputSplit>int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {Configuration conf = job.getConfiguration();InputFormat<?, ?> input =ReflectionUtils.newInstance(job.getInputFormatClass(), conf); List<InputSplit> splits = input.getSplits(job);T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);// sort the splits into order based on size, so that the biggest// go firstArrays.sort(array, new SplitComparator());JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);return array.length;}

切片计算逻辑,关注红色字体代码即可。

public List<InputSplit> getSplits(JobContext job) throws IOException {Stopwatch sw = new Stopwatch().start();long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));long maxSize = getMaxSplitSize(job);// generate splitsList<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);   //遍历文件,对每一个文件进行如下处理:获得文件的blocksize,获取文件的长度,得到切片信息(spilt 文件路径,切片编号,偏移量范围)for (FileStatus file: files) {Path path = file.getPath();long length = file.getLen();if (length != 0) {BlockLocation[] blkLocations;if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus) file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job.getConfiguration());blkLocations = fs.getFileBlockLocations(file, 0, length);}if (isSplitable(job, path)) {long blockSize = file.getBlockSize();long splitSize = computeSplitSize(blockSize, minSize, maxSize);long bytesRemaining = length;while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}if (bytesRemaining != 0) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}} else { // not splitablesplits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else { //Create empty hosts array for zero length filessplits.add(makeSplit(path, 0, length, new String[0]));}}// Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());sw.stop();if (LOG.isDebugEnabled()) {LOG.debug("Total # of splits generated by getSplits: " + splits.size()+ ", TimeTaken: " + sw.elapsedMillis());}return splits;}

 public static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize";public static final String SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize";long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//保证切分的文件长度最小不得小于1字节protected long getFormatMinSplitSize() {return 1;}//如果没有在conf中设置SPLIT_MINSIZE参数,则取默认值1字节。public static long getMinSplitSize(JobContext job) {return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);}//得到切片文件的最大长度long maxSize = getMaxSplitSize(job);//如果没有在conf中设置SPLIT_MAXSIZE参数,则去默认值Long.MAX_VALUE字节。public static long getMaxSplitSize(JobContext context) {return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);}//读取指定目录下的所有文件的信息List<FileStatus> files = listStatus(job);//如果没有指定开启几个线程读取,则默认一个线程去读文件信息,因为存在目录下有上亿个文件的情况,所以有需要开启多个线程加快读取。int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,DEFAULT_LIST_STATUS_NUM_THREADS);public static final String LIST_STATUS_NUM_THREADS ="mapreduce.input.fileinputformat.list-status.num-threads";public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;//计算切片文件的逻辑大小long splitSize = computeSplitSize(blockSize, minSize, maxSize);protected long computeSplitSize(long blockSize, long minSize,long maxSize) {return Math.max(minSize, Math.min(maxSize, blockSize));}private static final double SPLIT_SLOP = 1.1;   // 10% slop//判断剩余文件与切片大小的比是否为1.1.while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}

map并行度

  如果job的每个map或者reduce的task的运行时间都只有30-40秒钟(最好每个map的执行时间最少不低于一分钟),那么就减少该job的map或者reduce数。每一个task的启动和加入到调度器中进行调度,这个中间的过程可能都要花费几秒钟,所以如果每个task都非常快就跑完了,就会在task的开始和结束的时候浪费太多的时间。

  配置task的JVM重用可以改善该问题:
  (mapred.job.reuse.jvm.num.tasks,默认是1,表示一个JVM上最多可以顺序执行的task数目(属于同一个Job)是1。也就是说一个task启一个JVM)。

小文件的场景下,默认的切片机制会造成大量的maptask处理很少量的数据,效率低下:

解决方案:

  推荐:把小文件存入hdfs之前进行预处理,先合并为大文件后再上传。

  折中:写程序对hdfs上小文件进行合并再跑job处理。

  补救措施:如果大量的小文件已经存在hdfs上了,使用combineInputFormate组件,它可以将众多的小文件从逻辑上规划到一个切片中,这样多个小文件就可以交给一个maptask操作了。

转载于:https://www.cnblogs.com/intsmaze/p/6733968.html

MapReduce中map并行度优化及源码分析相关推荐

  1. THOR:MindSpore 自研高阶优化器源码分析和实践应用

    摘要:这篇文章跟大家分享下THOR的实践应用.THOR算法的部分内容当前已经在MindSpore中开源 本文分享自华为云社区<MindSpore 自研高阶优化器源码分析和实践应用>,原文作 ...

  2. netty中的future和promise源码分析(二)

    前面一篇netty中的future和promise源码分析(一)中对future进行了重点分析,接下来讲一讲promise. promise是可写的future,从future的分析中可以发现在其中没 ...

  3. WebRTC[1]-WebRTC中h264解码过程的源码分析

    目录 前言 正文 <WebRTC工作原理精讲>系列-总览_liuzhen007的专栏-CSDN博客_webrtc 原理前言欢迎大家订阅Data-Mining 的<WebRTC工作原理 ...

  4. 字节跳动Android三面视频解析:framework+MVP架构+HashMap原理+性能优化+Flutter+源码分析等

    前言 对于字节跳动的二面三面而言,Framework+MVP架构+HashMap原理+性能优化+Flutter+源码分析等问题都成高频问点!然而很多的朋友在面试时却答不上或者答不全!今天在这分享下这些 ...

  5. 【Java】NIO中Selector的select方法源码分析

    该篇博客的有些内容和在之前介绍过了,在这里再次涉及到的就不详细说了,如果有不理解请看[Java]NIO中Channel的注册源码分析, [Java]NIO中Selector的创建源码分析 Select ...

  6. 【java】java中的线程池 ThreadPoolExecutor源码分析

    文章目录 1.概述 4.源码 4.1 关键属性 4.2 构造函数 4.4 状态控制 4.5 ThreadLocalMap 4.6 execute方法源码分析 4.7 addWorker方法源码分析 4 ...

  7. Apache Mahout中推荐算法Slope one源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 关于推荐引擎 如今的互联网中,无论是电子商务还是社交网络,对数据挖掘的需求都越来越大了,而推荐引擎正是数据挖掘完美体现:通过分 ...

  8. 什么是cep算子_Flink中的CEP复杂事件处理 (源码分析)

    其实CEP复杂事件处理,简单来说你可以用通过类似正则表达式的方式去表示你的逻辑,表现能力非常的强,用过的人都知道 开篇先偷一张图,整体了解FlinkCEP中的  一种重要的图  NFA FlinkCE ...

  9. Java中的锁大全(底层源码分析)

    引用:https://tech.meituan.com/2018/11/15/java-lock.html 加锁过程:https://www.cnblogs.com/hkdpp/p/11917383. ...

最新文章

  1. blob二进制显示在html,使用Blob获取图片并二进制显示实例页面
  2. JAVA如何检测GC日志
  3. 开源Math.NET基础数学类库使用(13)C#实现其他随机数生成器
  4. w3wp进程发生死锁ISAPI aspnet
  5. 机器学习第八篇:详解逻辑斯蒂回归算法
  6. clear在CSS中的妙用
  7. 利用python实现批量查询ip地址归属地址
  8. 循环语句在c语言中的作用是什么,C语言中循环语句的使用
  9. POJ 1655:Balancing Act
  10. 新疆计算机证相关信息技术,2019新疆中小学教师计算机考试资料:信息技术课程基本理念...
  11. 谜底是计算机的谜语英语,英语谜语(Riddle)  谜底
  12. elementui带输入建议查询_知道Profiler是什么吗?带你了解SQL Server的性能优化工具...
  13. android RelativeLayout 动态添加子View
  14. 实现计算机和用户之间的关系,计算机系统概述
  15. html5人脸登录,基于HTML5 的人脸识别活体认证
  16. pygame安装超详细讲解
  17. 【复习】数学分析知识点梳理【思维导图】
  18. DSP2812之定时器
  19. Java ResourceBundle 加载外部路径资源文件方式
  20. 【node.js】报错Cannot mix different versions of joi schemas解决方法

热门文章

  1. 删除鼠标右键打开方式其他程序图标
  2. 银行交易系统 TiDB 在线缩容迁移
  3. maven中出现 ‘dependencies.dependency.version‘ for xxxx:jar is missing
  4. matlab读取文件与写入文件
  5. 线程死锁与共享变量的使用(转载)
  6. sublime unable to save 没有那个文件或者目录
  7. ubuntu下面的chrome浏览器增加标题栏
  8. oracle hr样本模式,Oracle 样本模式 HR
  9. springboot 项目依赖
  10. DWR的使用以及DWR中工具JS文件的使用