MapReduce中map并行度优化及源码分析
mapTask并行度的决定机制
一个job的map阶段并行度由客户端在提交job时决定,而客户端对map阶段并行度的规划的基本逻辑为:将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个mapTask并行实例处理。
FileInputFormat切片机制
原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/6733968.html
1、默认切片定义在InputFormat类中的getSplit()方法
2、FileInputFormat中默认的切片机制:
a) 简单地按照文件的内容长度进行切片
b) 切片大小,默认等于hdfs的block大小
c) 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
比如待处理数据有两个文件:
file1.txt 260M file2.txt 10M
经过FileInputFormat的切片机制运算后,形成的切片信息如下:
file1.txt.split1-- 0~128 file1.txt.split2-- 128~260 //如果剩余的文件长度/切片长度<=1.1则会将剩余文件的长度并未一个切片 file2.txt.split1-- 0~10M
3、FileInputFormat中切片的大小的参数配置
通过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由这几个值来运算决定。
minsize:默认值:1 配置参数: mapreduce.input.fileinputformat.split.minsize maxsize:默认值:Long.MAXValue 配置参数:mapreduce.input.fileinputformat.split.maxsizeblocksize:值为hdfs的对应文件的blocksize 配置读取目录下文件数量的线程数:public static final String LIST_STATUS_NUM_THREADS = "mapreduce.input.fileinputformat.list-status.num-threads";
因此,默认情况下,Math.max(minSize, Math.min(maxSize, blockSize));切片大小=blocksize
maxsize(切片最大值):参数如果调得比blocksize小,则会让切片变小。
minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blocksize还大。
选择并发数的影响因素:
1、运算节点的硬件配置
2、运算任务的类型:CPU密集型还是IO密集型
3、运算任务的数据量
3、hadoop2.6.4源码解析
org.apache.hadoop.mapreduce.JobSubmitter类
//得到job的map任务的并行数量private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {JobConf jConf = (JobConf)job.getConfiguration();int maps;if (jConf.getUseNewMapper()) {maps = writeNewSplits(job, jobSubmitDir);} else {maps = writeOldSplits(jConf, jobSubmitDir);}return maps;}@SuppressWarnings("unchecked")private <T extends InputSplit>int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {Configuration conf = job.getConfiguration();InputFormat<?, ?> input =ReflectionUtils.newInstance(job.getInputFormatClass(), conf); List<InputSplit> splits = input.getSplits(job);T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);// sort the splits into order based on size, so that the biggest// go firstArrays.sort(array, new SplitComparator());JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);return array.length;}
切片计算逻辑,关注红色字体代码即可。
public List<InputSplit> getSplits(JobContext job) throws IOException {Stopwatch sw = new Stopwatch().start();long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));long maxSize = getMaxSplitSize(job);// generate splitsList<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); //遍历文件,对每一个文件进行如下处理:获得文件的blocksize,获取文件的长度,得到切片信息(spilt 文件路径,切片编号,偏移量范围)for (FileStatus file: files) {Path path = file.getPath();long length = file.getLen();if (length != 0) {BlockLocation[] blkLocations;if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus) file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job.getConfiguration());blkLocations = fs.getFileBlockLocations(file, 0, length);}if (isSplitable(job, path)) {long blockSize = file.getBlockSize();long splitSize = computeSplitSize(blockSize, minSize, maxSize);long bytesRemaining = length;while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}if (bytesRemaining != 0) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}} else { // not splitablesplits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else { //Create empty hosts array for zero length filessplits.add(makeSplit(path, 0, length, new String[0]));}}// Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());sw.stop();if (LOG.isDebugEnabled()) {LOG.debug("Total # of splits generated by getSplits: " + splits.size()+ ", TimeTaken: " + sw.elapsedMillis());}return splits;}
public static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize";public static final String SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize";long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//保证切分的文件长度最小不得小于1字节protected long getFormatMinSplitSize() {return 1;}//如果没有在conf中设置SPLIT_MINSIZE参数,则取默认值1字节。public static long getMinSplitSize(JobContext job) {return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);}//得到切片文件的最大长度long maxSize = getMaxSplitSize(job);//如果没有在conf中设置SPLIT_MAXSIZE参数,则去默认值Long.MAX_VALUE字节。public static long getMaxSplitSize(JobContext context) {return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);}//读取指定目录下的所有文件的信息List<FileStatus> files = listStatus(job);//如果没有指定开启几个线程读取,则默认一个线程去读文件信息,因为存在目录下有上亿个文件的情况,所以有需要开启多个线程加快读取。int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,DEFAULT_LIST_STATUS_NUM_THREADS);public static final String LIST_STATUS_NUM_THREADS ="mapreduce.input.fileinputformat.list-status.num-threads";public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;//计算切片文件的逻辑大小long splitSize = computeSplitSize(blockSize, minSize, maxSize);protected long computeSplitSize(long blockSize, long minSize,long maxSize) {return Math.max(minSize, Math.min(maxSize, blockSize));}private static final double SPLIT_SLOP = 1.1; // 10% slop//判断剩余文件与切片大小的比是否为1.1.while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}
map并行度
如果job的每个map或者reduce的task的运行时间都只有30-40秒钟(最好每个map的执行时间最少不低于一分钟),那么就减少该job的map或者reduce数。每一个task的启动和加入到调度器中进行调度,这个中间的过程可能都要花费几秒钟,所以如果每个task都非常快就跑完了,就会在task的开始和结束的时候浪费太多的时间。
小文件的场景下,默认的切片机制会造成大量的maptask处理很少量的数据,效率低下:
解决方案:
推荐:把小文件存入hdfs之前进行预处理,先合并为大文件后再上传。
折中:写程序对hdfs上小文件进行合并再跑job处理。
补救措施:如果大量的小文件已经存在hdfs上了,使用combineInputFormate组件,它可以将众多的小文件从逻辑上规划到一个切片中,这样多个小文件就可以交给一个maptask操作了。
转载于:https://www.cnblogs.com/intsmaze/p/6733968.html
MapReduce中map并行度优化及源码分析相关推荐
- THOR:MindSpore 自研高阶优化器源码分析和实践应用
摘要:这篇文章跟大家分享下THOR的实践应用.THOR算法的部分内容当前已经在MindSpore中开源 本文分享自华为云社区<MindSpore 自研高阶优化器源码分析和实践应用>,原文作 ...
- netty中的future和promise源码分析(二)
前面一篇netty中的future和promise源码分析(一)中对future进行了重点分析,接下来讲一讲promise. promise是可写的future,从future的分析中可以发现在其中没 ...
- WebRTC[1]-WebRTC中h264解码过程的源码分析
目录 前言 正文 <WebRTC工作原理精讲>系列-总览_liuzhen007的专栏-CSDN博客_webrtc 原理前言欢迎大家订阅Data-Mining 的<WebRTC工作原理 ...
- 字节跳动Android三面视频解析:framework+MVP架构+HashMap原理+性能优化+Flutter+源码分析等
前言 对于字节跳动的二面三面而言,Framework+MVP架构+HashMap原理+性能优化+Flutter+源码分析等问题都成高频问点!然而很多的朋友在面试时却答不上或者答不全!今天在这分享下这些 ...
- 【Java】NIO中Selector的select方法源码分析
该篇博客的有些内容和在之前介绍过了,在这里再次涉及到的就不详细说了,如果有不理解请看[Java]NIO中Channel的注册源码分析, [Java]NIO中Selector的创建源码分析 Select ...
- 【java】java中的线程池 ThreadPoolExecutor源码分析
文章目录 1.概述 4.源码 4.1 关键属性 4.2 构造函数 4.4 状态控制 4.5 ThreadLocalMap 4.6 execute方法源码分析 4.7 addWorker方法源码分析 4 ...
- Apache Mahout中推荐算法Slope one源码分析
2019独角兽企业重金招聘Python工程师标准>>> 关于推荐引擎 如今的互联网中,无论是电子商务还是社交网络,对数据挖掘的需求都越来越大了,而推荐引擎正是数据挖掘完美体现:通过分 ...
- 什么是cep算子_Flink中的CEP复杂事件处理 (源码分析)
其实CEP复杂事件处理,简单来说你可以用通过类似正则表达式的方式去表示你的逻辑,表现能力非常的强,用过的人都知道 开篇先偷一张图,整体了解FlinkCEP中的 一种重要的图 NFA FlinkCE ...
- Java中的锁大全(底层源码分析)
引用:https://tech.meituan.com/2018/11/15/java-lock.html 加锁过程:https://www.cnblogs.com/hkdpp/p/11917383. ...
最新文章
- blob二进制显示在html,使用Blob获取图片并二进制显示实例页面
- JAVA如何检测GC日志
- 开源Math.NET基础数学类库使用(13)C#实现其他随机数生成器
- w3wp进程发生死锁ISAPI aspnet
- 机器学习第八篇:详解逻辑斯蒂回归算法
- clear在CSS中的妙用
- 利用python实现批量查询ip地址归属地址
- 循环语句在c语言中的作用是什么,C语言中循环语句的使用
- POJ 1655:Balancing Act
- 新疆计算机证相关信息技术,2019新疆中小学教师计算机考试资料:信息技术课程基本理念...
- 谜底是计算机的谜语英语,英语谜语(Riddle) 谜底
- elementui带输入建议查询_知道Profiler是什么吗?带你了解SQL Server的性能优化工具...
- android RelativeLayout 动态添加子View
- 实现计算机和用户之间的关系,计算机系统概述
- html5人脸登录,基于HTML5 的人脸识别活体认证
- pygame安装超详细讲解
- 【复习】数学分析知识点梳理【思维导图】
- DSP2812之定时器
- Java ResourceBundle 加载外部路径资源文件方式
- 【node.js】报错Cannot mix different versions of joi schemas解决方法
热门文章
- 删除鼠标右键打开方式其他程序图标
- 银行交易系统 TiDB 在线缩容迁移
- maven中出现 ‘dependencies.dependency.version‘ for xxxx:jar is missing
- matlab读取文件与写入文件
- 线程死锁与共享变量的使用(转载)
- sublime unable to save 没有那个文件或者目录
- ubuntu下面的chrome浏览器增加标题栏
- oracle hr样本模式,Oracle 样本模式 HR
- springboot 项目依赖
- DWR的使用以及DWR中工具JS文件的使用