• reduce task数量的决定机制

1、业务逻辑需要

2、数据量大小

设置方法:

job.setNumReduceTasks(5)

  • map task数量的决定机制

由于map task之间没有协作关系,每一个map task都是各自为政,在map task的处理中没法做“全局”性的聚合操作,所以map task的数量完全取决于所处理的数据量的大小

决定机制:

对待处理数据进行“切片”

每一个切片分配一个map task来处理

Mapreduce框架中默认的切片机制:

TextInputFormat.getSplits()继承自FileInputFormat.getSplits()

1:定义一个切片大小:可以通过参数来调节,默认情况下等于“hdfs中设置的blocksize”,通常是128M

2:获取输入数据目录下所有待处理文件List

3:遍历文件List,逐个逐个文件进行切片

for(file:List)

对file从0偏移量开始切,每到128M就构成一个切片,比如a.txt(200M),就会被切成两个切片:   a.txt: 0-128M,  a.txt :128M-256M

再比如b.txt(80M),就会切成一个切片, b.txt :0-80M

  • 如果要处理的数据是大量的小文件,使用上述这种默认切片机制,就会导致大量的切片,从而maptask进程数特别多,但是每一个切片又非常小,每个maptask的处理数据量就很小,从而,整体的效率会很低。

通用解决方案:就是将多个小文件划分成一个切片;实现办法就是自定义一个Inputformat子类重写里面的getSplits方法;

Mapreduce框架中自带了一个用于此场景的Inputformat实现类:CombineFileInputformat

数据切片与map任务数的机制

示例观察(多文件,大文件)

源码跟踪

TextInputFormat源码阅读

isSplitable() 判断要处理的数据是否可以做切片

getSplit()  规划切片信息(实现在FileInputFormat类中)

----TextInputformat切片逻辑: 对每一个文件单独切片;切片大小默认等于blocksize

但是有两个参数可以调整:

如果是大量小文件,这种切片逻辑会有重大弊端:切片数量太多,maptask太多

createRecordReader()  构造一个记录读取器

具体读取数据的逻辑是实现在LineRecordReader中 (按行读取数据,行起始偏移量作为key,行的内容作为value),比较特别的地方是:

LineRecordReader在读取一个具体的切片时,总是忽略掉第一行(针对的是:非第一切片),总是跨split多读一行(针对的是:非最末切片)

  • InputFormat的继承体系

InputFormat子类介绍:

(1)TextInputFormat(默认的输入格式类)详解

-- 源码结构 getsplits()  reader

-- 为何不会出现一行被割断处理的原理

  • 在LineRecordReader中,对split的第一行忽略
  public void initialize(InputSplit genericSplit,TaskAttemptContext context) throws IOException {FileSplit split = (FileSplit) genericSplit;Configuration job = context.getConfiguration();… ………..// open the file and seek to the start of the splitfinal FileSystem fs = file.getFileSystem(job);fileIn = fs.open(file);CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);if (null!=codec) {… … … …
//我们总是将第一条记录抛弃(文件第一个split除外)
//因为我们总是在nextKeyValue ()方法中跨split多读了一行(文件最后一个split除外)if (start != 0) {start += in.readLine(new Text(), 0, maxBytesToConsume(start));}this.pos = start;}
  • 在LineRecordReader中,nextKeyValue ()方法总是跨split多读一行
public boolean nextKeyValue() throws IOException {if (key == null) {key = new LongWritable();}key.set(pos);if (value == null) {value = new Text();}int newSize = 0;// 使用<=来多读取一行while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {newSize = in.readLine(value, maxLineLength,Math.max(maxBytesToConsume(pos), maxLineLength));pos += newSize;if (newSize < maxLineLength) {break;…. …. }
  1. CombineTextInputFormat

它的切片逻辑跟TextInputformat完全不同:

CombineTextInputFormat可以将多个小文件划为一个切片

这种机制在处理海量小文件的场景下能提高效率

(小文件处理的机制,最优的是将小文件先合并再处理)

思路

CombineFileInputFormat涉及到三个重要的属性:

mapred.max.split.size:同一节点或同一机架的数据块形成切片时,切片大小的最大值;

mapred.min.split.size.per.node:同一节点的数据块形成切片时,切片大小的最小值;

mapred.min.split.size.per.rack:同一机架的数据块形成切片时,切片大小的最小值。

切片形成过程:

(1)逐个节点(数据块)形成切片;

a.遍历并累加这个节点上的数据块,如果累加数据块大小大于或等于mapred.max.split.size,则将这些数据块形成一个切片,继承该过程,直到剩余数据块累加大小小于mapred.max.split.size,则进行下一步;

b.如果剩余数据块累加大小大于或等于mapred.min.split.size.per.node,则将这些剩余数据块形成一个切片,如果剩余数据块累加大小小于mapred.min.split.size.per.node,则这些数据块留待后续处理。

(2)逐个机架(数据块)形成切片;

a.遍历并累加这个机架上的数据块(这些数据块即为上一步遗留下来的数据块),如果累加数据块大小大于或等于mapred.max.split.size,则将这些数据块形成一个切片,继承该过程,直到剩余数据块累加大小小于mapred.max.split.size,则进行下一步;

b.如果剩余数据块累加大小大于或等于mapred.min.split.size.per.rack,则将这些剩余数据块形成一个切片,如果剩余数据块累加大小小于mapred.min.split.size.per.rack,则这些数据块留待后续处理。

(3)遍历并累加剩余数据块,如果数据块大小大于或等于mapred.max.split.size,则将这些数据块形成一个切片,继承该过程,直到剩余数据块累加大小小于mapred.max.split.size,则进行下一步;

(4)剩余数据块形成一个切片。

核心实现

// mapping from a rack name to the list of blocks it has
HashMap<String,List<OneBlockInfo>> rackToBlocks =
new HashMap<String,List<OneBlockInfo>>();
// mapping from a block to the nodes on which it has replicas
HashMap<OneBlockInfo,String[]> blockToNodes =
new HashMap<OneBlockInfo,String[]>();
// mapping from a node to the list of blocks that it contains
HashMap<String,List<OneBlockInfo>> nodeToBlocks =
new HashMap<String,List<OneBlockInfo>>();

开始形成切片之前,需要初始化三个重要的映射关系:

rackToBlocks:机架和数据块的对应关系,即某一个机架上有哪些数据块;

blockToNodes:数据块与节点的对应关系,即一块数据块的“拷贝”位于哪些节点;

nodeToBlocks:节点和数据块的对应关系,即某一个节点上有哪些数据块;

初始化过程如下代码所示,其中每一个Path代表的文件被形成一个OneFileInfo对象,映射关系也在形成OneFileInfo的过程中被维护。

// populate all the blocks for all fileslong totLength = 0;
for (int i = 0; i < paths.length; i++) {files[i] = new OneFileInfo(paths[i], job, rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);totLength += files[i].getLength();
}
  1. 逐个节点(数据块)形成切片,代码如下:
// 保存当前切片所包含的数据块ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();// 保存当前切片中的数据块属于哪些节点ArrayList<String> nodes = new ArrayList<String>();// 保存当前切片的大小long curSplitSize = 0;// process all nodes and create splits that arelocalto a node. // 依次处理每个节点上的数据块for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); iter.hasNext();) {Map.Entry<String, List<OneBlockInfo>> one = iter.next();nodes.add(one.getKey());List<OneBlockInfo> blocksInNode = one.getValue();// for each block, copy it into validBlocks. Delete it from blockToNodes so that the same block does not appear in// two different splits.// 依次处理每个数据块,注意blockToNodes变量的作用,它保证了同一数据块不会出现在两个切片中for (OneBlockInfo oneblock : blocksInNode) {if (blockToNodes.containsKey(oneblock)) {validBlocks.add(oneblock);blockToNodes.remove(oneblock);curSplitSize += oneblock.length;// if the accumulated split size exceeds the maximum, then create this split.// 如果数据块累积大小大于或等于maxSize,则形成一个切片if (maxSize != 0 && curSplitSize >= maxSize) {//create an input split andadd it to the splits array            addCreatedSplit(job, splits, nodes, validBlocks);curSplitSize = 0;validBlocks.clear();}}}// if there were any blocks left over and their combined size is// larger than minSplitNode, then combine them into one split.// Otherwise add them back to the unprocessed pool. It is likely // that they will be combined with other blocks from the same rack later on.// 如果剩余数据块大小大于或等于minSizeNode,则将这些数据块构成一个切片;// 如果剩余数据块大小小于minSizeNode,则将这些数据块归还给blockToNodes,交由后期“同一机架”过程处理if (minSizeNode != 0 && curSplitSize >= minSizeNode) {//create an input split andadd it to the splits array        addCreatedSplit(job, splits, nodes, validBlocks);} else {for (OneBlockInfo oneblock : validBlocks) {blockToNodes.put(oneblock, oneblock.hosts);}}validBlocks.clear();nodes.clear();curSplitSize = 0;}

(2)逐个机架(数据块)形成切片,代码如下:

// if blocks in a rack are below the specified minimum size, then keep them// in 'overflow'. After the processing of all racks is complete, these overflow// blocks will be combined into splits.// overflowBlocks用于保存“同一机架”过程处理之后剩余的数据块ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();ArrayList<String> racks = new ArrayList<String>();// Process all racks over and over again until there is no more work to do.while (blockToNodes.size() > 0) {//Create one split for this rack before moving over to the next rack. // Come back to this rack after creating a single split for each of the // remaining racks.// Process one rack location at a time, Combine all possible blocks that// reside on this rack as one split. (constrained by minimum and maximum// split size).// iterate over all racks // 依次处理每个机架for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = rackToBlocks.entrySet().iterator(); iter.hasNext();) {Map.Entry<String, List<OneBlockInfo>> one = iter.next();racks.add(one.getKey());List<OneBlockInfo> blocks = one.getValue();// for each block, copy it into validBlocks. Delete it from// blockToNodes so that the same block does not appear in// two different splits.boolean createdSplit = false;// 依次处理该机架的每个数据块for (OneBlockInfo oneblock : blocks) {if (blockToNodes.containsKey(oneblock)) {validBlocks.add(oneblock);blockToNodes.remove(oneblock);curSplitSize += oneblock.length;// if the accumulated split size exceeds the maximum, then create this split.// 如果数据块累积大小大于或等于maxSize,则形成一个切片if (maxSize != 0 && curSplitSize >= maxSize) {//create an input split andadd it to the splits array              addCreatedSplit(job, splits, getHosts(racks), validBlocks);createdSplit = true;break;}}}// if we created a split, then just go to the next rackif (createdSplit) {curSplitSize = 0;validBlocks.clear();racks.clear();continue;}if (!validBlocks.isEmpty()) {// 如果剩余数据块大小大于或等于minSizeRack,则将这些数据块构成一个切片if (minSizeRack != 0 && curSplitSize >= minSizeRack) {// if there is a mimimum size specified, then create a single split// otherwise, store these blocks into overflow data structure            addCreatedSplit(job, splits, getHosts(racks), validBlocks);} else {// There were a few blocks in this rack that remained to be processed.// Keep them in 'overflow' block list. These will be combined later.// 如果剩余数据块大小小于minSizeRack,则将这些数据块加入overflowBlocks            overflowBlocks.addAll(validBlocks);}}curSplitSize = 0;validBlocks.clear();racks.clear();}}

(3)遍历并累加剩余数据块,代码如下:

// Process all overflow blocksfor (OneBlockInfo oneblock : overflowBlocks) {validBlocks.add(oneblock); curSplitSize += oneblock.length;// This might cause an exiting rack location to be re-added,
// but it should be ok.for (int i = 0; i < oneblock.racks.length; i++) {racks.add(oneblock.racks[i]); }
// if the accumulated split size exceeds the maximum, then
//create this split.
// 如果剩余数据块大小大于或等于maxSize,则将这些数据块构成一个切片if (maxSize != 0 && curSplitSize >= maxSize) {
//create an input split andadd it to the splits array   addCreatedSplit(job, splits, getHosts(racks), validBlocks);   curSplitSize = 0;validBlocks.clear();racks.clear();}}

(4)剩余数据块形成一个切片,代码如下:

// Process any remaining blocks, if any.if (!validBlocks.isEmpty()) {addCreatedSplit(job, splits, getHosts(racks), validBlocks);}

总结

CombineFileInputFormat形成切片过程中考虑数据本地性(同一节点、同一机架),首先处理同一节点的数据块,然后处理同一机架的数据块,最后处理剩余的数据块,可见本地性是逐步减弱的。另外CombineFileInputFormat是抽象的,具体使用时需要自己实现getRecordReader方法。

(3)SequenceFileInputFormat/SequenceFileOutputFormat

sequenceFile是hadoop中非常重要的一种数据格式

sequenceFile文件内部的数据组织形式是:K-V对

读入/写出为hadoop序列文件

Hadoop大数据--Mapreduce程序运行并发度相关推荐

  1. hadoop大数据——mapreduce程序提交运行模式及debug方法

    本地运行模式 (1)mapreduce程序是被提交给LocalJobRunner在本地运行 (2)而处理的数据及输出结果可以在本地文件系统,也可以在hdfs上 怎样实现本地运行?:写一个程序,不要带集 ...

  2. Hadoop大数据——mapreduce的排序机制之total排序

    mapreduce的排序机制之total排序 (1)设置一个reduce task ,全局有序,但是并发度太低,单节点负载太大 (2)设置分区段partitioner,设置相应数量的reduce ta ...

  3. Hadoop大数据--Mapreduce编程规范及入门示例

    Mapreduce是一个分布式的运算编程框架,核心功能是将用户编写的核心逻辑代码分布式地运行在一个集群的很多服务器上. Mapreduce的存在价值 (1)海量数据在单机上处理因为硬件资源限制,无法胜 ...

  4. Hadoop大数据——mapreduce的join算法

    (1)Reduce side join 示例: 订单数据 商品信息 实现机制: 通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce ...

  5. Hadoop大数据——mapreduce中的Combiner/序列化/排序初步

    mapreduce中的Combiner (1)combiner是MR程序中Mapper和Reducer之外的一种组件 (2)combiner组件的父类就是Reducer (3)Combiner和red ...

  6. Hadoop大数据——mapreduce的secondary排序机制

    secondary排序机制 ----就是让mapreduce帮我们根据value排序 考虑一个场景,需要取按key分组的最大value条目: 通常,shuffle只是对key进行排序 如果需要对val ...

  7. Hadoop大数据——mapreduce的Distributed cache

    应用场景:map side join 工作原理: 通过mapreduce框架将一个文件(本地/HDFS)分发到每一个运行时的task(map task /reduce task)节点上(放到task进 ...

  8. Hadoop大数据——MR程序map任务数的规划机制

    一个inputsplit对应一个map 而inputsplit切片规划是由InputFormat的具体实现子类来实现,就是调用 InputSplits[ ] getSplits() 方法,这个方法的逻 ...

  9. Hadoop大数据从入门到精通-任亮-专题视频课程

    Hadoop大数据从入门到精通-48021人已学习 课程介绍         Hadoop分布式文件系统(HDFS)和MapReduce的工作原理 如何优化Hadoop机群所需要的硬件配置 搭建Had ...

最新文章

  1. 负载均衡策略深入剖析
  2. php_mongo.dll下载(php操作mongoDB需要)
  3. 编译测试后出现“发现不明确的匹配”错误
  4. 命名规范(1)大小写约定
  5. <深入剖析Tomcat>摘抄
  6. Log4j.properties 配置详解
  7. Identity Server 4 - Hybrid Flow - 使用ABAC保护MVC客户端和API资源
  8. 2.9 穆尔彭罗斯伪逆
  9. php 检测密码,php 判断密码是否简单
  10. TensorFlow实现深度学习算法的教程汇集:代码+笔记
  11. DELL R430服务器做raid5以及安装操作系统过程
  12. Java泛型报错的解决办法
  13. date和datetime长度设置多少_太原市玻璃温室大棚多少钱
  14. 网络信息安全及常见数据加密技术
  15. Audition人声美化
  16. 关于QComboBox
  17. Centos7安装sqliteman
  18. iOS 开发:知识地图(不定期更新)
  19. 利用HISTFILESIZE和HISTSIZE在ubunutu中调整命令行History的Size
  20. Emscripten 单词_学会词根词缀,开启高效、快速地记忆英语单词模式

热门文章

  1. 【论文相关】盘点AAAI2020中的四篇推荐系统好文
  2. 【机器学习基础】机器学习训练中常见的问题和挑战!
  3. github标星11600+:最全的吴恩达机器学习课程资源(完整笔记、中英文字幕视频、python作业,提供百度云镜像!)...
  4. 斯坦福大学机器学习课程资料-吴恩达老师主讲(2008版)
  5. ThreadLocal 详解
  6. Dojo 如何测试 widget
  7. 应该允许公司报复黑客吗?
  8. mysql中json_merge函数的使用?
  9. windows下安装PyTorch0.4.0
  10. VMware 当中出现:无法将 Ethernet0 连接到虚拟网络VMnet8的问题