Maptask调用一个组件FileInputFormat
FileInputFormat有一个最高层的接口 --> InputFormat
我们不需要去写自己的实现类,使用的就是内部默认的组件:TextInputFormat

maptask先调用TextInputFormat, 但是实质读数据是TextInputFormat调用RecordReader。 RecordReader 是一个接口,这个接口的实现类调用read方法去读取数据。

InputFormat和RecordReader:

org.apache.hadoop.mapreduce包里的InputFormat抽象类提供了如下列代码所示的两个方法:

public abstract class InputFormat<K, V> {public abstract List<InputSplit> getSplits(JobContext context);RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context);
}

这两个方法展示了InputFormat类的两个功能:

  • 将输入文件切分为map处理所需的split
  • 创建RecordReader类,它将从一个split生成键值对序列

RecordReader类同样也是org.apache.hadoop.mapreduce包里的抽象类

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {/** 初始化RecordReader,只能被调用一次。*/public abstract void initialize(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException;/*** 获取下一个数据的键值对*/public abstract boolean nextKeyValue() throws IOException, InterruptedException;/*** 获取当前数据的 key*/public abstractKEYIN getCurrentKey() throws IOException, InterruptedException;/*** 获取当前数据的value*/public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;/***  进度*/public abstract float getProgress() throws IOException, InterruptedException;/*** 关闭RecordReader*/public abstract void close() throws IOException;
}

组合使用InputFormat和RecordReader可以将任何类型的输入数据转换为MapReduce所需的键值对。

下面有一篇文章可以参考:
https://www.cnblogs.com/xuepei/p/3664698.html

InputFormat:
通过使用InputFormat,MapReduce框架可以做到:

  • 1、验证作业的输入的正确性

  • 2、将输入文件切分成逻辑的InputSplits,一个InputSplit将被分配给一个单独的Mapper task

  • 3、提供RecordReader的实现,这个RecordReader会从InputSplit中正确读出一条一条的K-V对供Mapper使用。

maptask会将返回的 key - value 交给自定义的Mapper。如果没有自定义的Mapper就是使用默认的Mapper。这个默认的Mapper的map处理方法,就是进来什么就输出什么。

map方法的参数有 key - value ,所以maptask会将InputFormat返回的key - value交给Mapper的map方法去使用。

内部还有一个组件 OutputCollector:
OutputCollector 由hadoop框架提供,复制收集Mapper和Reducer的输出数据,实现map或者reduce函数时,只需要简单地将其输出的<key,value>对往OutputCollector中一丢即可,剩余的框架会帮你做好。

这个收集器会将数据输出到一个环形缓冲区,其实就是一个数组,(一边写数据,一边会对数据进行回收,通过索引来控制,相当于一个环一样)

环形缓冲区其实就是一个数组,后端不断接收数据的同时,前端数据不断溢出,长度用完之后读取的新数据再从前端开始覆盖。这个缓冲区的默认大小是100M – 这个环形缓冲区是内存里面的,速度很快,但是容量有限。-- 这个100M是不会全部被使用的,因为还需要内存进行排序等操作。 这里面有一个保留区
操作80%就会溢出

spiller组件会从环形缓冲区溢出文件,这过程会按照定义的partitioner分区(默认是hashpartition),并且按照key.compareTo()进行排序(底层主要是快排和外部排序)
spiller会在环形缓冲区溢出的时候,对数据进行分区和排序, – (spiller会会环形缓冲区这段内存进行操作)
分区是根据key,这个分区需要调用一个组件 Partitioner - 默认的实现是 HashPartitioner.
排序的时候需要调用被排序对象的compareTo()方法。

这个过程就是将数据分好区,每个区都是排好序的。
分区排序之后,就可以将文件往外写。一个区一个区往外写。 内存中的数据写到文件中。这个文件就在maptask的工作目录里面。就在本地。
每一次溢出就会产生一个文件。

如果内存里面还剩下最后的一点点数据,那也需要溢出。

这些溢出的文件不会一个一个地交给reduce去处理,还会先进行一次合并。合并完之后会形成一个大文件。这个merge使用的是归并排序。

merge之后就会将数据全部交给reduce来进行处理:

reduce task会去下载数据(通过网络从传输)

reduce task 从 map task 机器上下载下来的数据是没有排序的,还需要进行再一次地合并。

问题: reduce task 拿到这个文件是怎么处理的?
reducer 的 reduce方法是处理业务逻辑的,会根据key来处理,相同的key就调用一次reduce() 方法。调用 GroupingComparator()方法



完整的图:


Combainer

Hadoop矿建使用Mapper将数据处理成一个个<key,value>键值对,在网络节点间对其进行整理(shuffle),然后使用Reduce处理数据并进行最后的输出。

这时会出现性能上的瓶颈:
(1)如果我们有10亿个数据,Mapper会产生10亿个键值对在网络间进行传输,但是如果我们对数据求最大值,那么很明显的Mapper值需要输出它所知道的最大值即可。这样不仅可以减少轻网络压力,同样也可以大幅度提高程序效率。
总结:网络带宽严重被占降低效率。
(2)有时候数据远远不是一致性的或者说是平衡分布的,这样不仅Mapper中的键值对,中间阶段(shuffle)的键值对等,大多数的键值对最终会聚集于单一的Reducer上,压倒这个Reducer,从而大大降低程序的性能。
总结:单一节点承载过重降低程序性能。


在MapperReducer编程模型中,在Mapper和Reducer之间有一个非常重要的组件,它解决了上述的性能瓶颈问题,它就是Combiner.

注意点:

 (1)与mapper和reducer不同的是,combiner没有默认的实现,需要显式设置早conf中才有作用,(2)并不是所有的job都适用combiner,只有操作满足结合律的才可设置combiner。combine操作类似于:opt(opt(1, 2, 3), opt(4, 5, 6))。如果opt为求和、求最大值的话,可以使用,但是如果是求中值的话,不适用。

每一个mapper都可能产生大量的本地输出,Combiner额作用就是对map端的输出先做一次合并,减少在map和reduce节点之间的数据传输,以提高网络IO性能,是MapReduce的一种优化手段之一,其具体作用如下所述:

自定义的Combiner其实就是跟Reducer一样的代码:

package com.thp.bigdata.wcdemo;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*** 自定义的Combiner* @author tommy**/
public class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {//System.out.println("Combiner输入分组<" + key.toString() + ",N(N>=1)>");int count = 0;for(IntWritable value : values) {count += value.get();  // 这个count 最后就是某一个单词的汇总的值// 显示次数表示输入的k2,v2的键值对数量System.out.println("Combiner输入键值对<" + key.toString() + "," + value.get() + ">");}context.write(key, new IntWritable(count));System.out.println("Combiner输出键值对<" + key.toString() + "," + count+ ">");}}

还需要将这个自定义的Combiner添加进去:

// 设置Map规约Combinerjob.setCombinerClass(MyCombiner.class);

我每一个文件中就有8个hello,这个8个hello如果使用了combiner会先进行一次合并:

没使用之前:

Reducer输入分组<hello,N(N>=1)>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>
Reducer输入键值对<hello,1>

reduce会有56次输入,但是如果使用了combiner的话,每一次map之后,就会进行换一次合并:

接下来会有一个combiner进行一次合并:

第一个文件中的8个hello就进行了一次合并:
每一个文件都会有这个combiner的过程,最后会有reduce的过程进行汇总:

这个reduce过程对hello进行统计只有7次输入,每一次输入的数据都是8次,是combiner先进行了一次合并。

总结:
从控制台的信息我们可以发现,其实combiner只是把相同的hello进行规约,由此输入给reduce的就变成<hello,8>。在实际的Hadoop集群操作中,我们是由多台主机一起进行MapReduce的,如果加入规约操作,每一台主机在reduce之前先进行一次对本机数据的规约,然后再通过集群进行reduce操作,这样就大大节省reduce时间,从而加快了MapReduce的处理速度。

但是:特别值得注意的一点是,一个combiner只是处理一个节点中的输出,而不能享受向reduce一样的输入(经过了shuffle阶段的数据),这点非常关键

combiner:

前面展示的流水线忽略了一个可以优化MapReduce作业所使用的带宽的步骤,这个过程叫Combiner,它在Mapper之后,Reducer之前运行。combienr是可选的,如果这个过程适合于你的作业,Combiner实例会在每一个运行map任务的节点上运行。Combiner会接收特定节点上的Mapper实例的输出作为输入,接着Combiner的输出会被送到Reducer那里,而不是发送Mapper的输出。Combiner是一个“迷你reduce过程”,他只处理单台机器生成的数据。

参考别的大神的博客:
https://blog.csdn.net/guoery/article/details/8529004

MapReduce内部shuffle过程详解(Combiner的使用)相关推荐

  1. MapReduce:Shuffle过程详解

    MapReduce:Shuffle过程详解 1.Map任务处理 1.1 读取HDFS中的文件.每一行解析成一个<k,v>.每一个键值对调用一次map函数.                & ...

  2. Shuffle过程详解

    Shuffle过程详解 Shuffle过程是MapReduce的核心,最近看了很多资料,网上说法大体相同,但有些地方有一点点出入,就是各个阶段的执行顺序 总个shuffle过程可以看做是从map输出到 ...

  3. hadoop: Shuffle过程详解 (转载)

    原文地址:http://langyu.iteye.com/blog/992916 另一篇博文:http://www.cnblogs.com/gwgyk/p/3997849.html Shuffle过程 ...

  4. Mapreduce中maptask过程详解

    一.Maptask并行度与决定机制 1.一个job任务的map阶段的并行度默认是由该任务的大小决定的: 2.一个split切分分配一个maprask来并行处理: 3.默认情况下,split切分的大小等 ...

  5. mapreduce图示原理深入详解,几张图搞定

    1. MAPREDUCE原理篇 Mapreduce 是一个分布式运算程序的 编程框架 , 是用户开发"基于 hadoop 的数据分析应用"的核心框架: Mapreduce 核心功能 ...

  6. Hadoop学习之Mapreduce执行过程详解

    一.MapReduce执行过程 MapReduce运行时,首先通过Map读取HDFS中的数据,然后经过拆分,将每个文件中的每行数据分拆成键值对,最后输出作为Reduce的输入,大体执行流程如下图所示: ...

  7. Mysql 优化器内部JOIN算法hash join On-Disk Hash Join Grace Hash Join Hybrid hash join过程详解

    Mysql 各种hash join算法讲解 hash join的概述 提到hash join之前自然得说Nest loop join,以两个表的关联为例,它其实是个双层循环,先遍历外层的表(n条),再 ...

  8. Mysql 优化器内部JOIN算法hash join Nestloopjoin及classic hash join CHJ过程详解

    Mysql hash join之classic hash join CHJ过程详解 hash join的历史 优化器里的hash join算法在SQL Server.Oracle.postgress等 ...

  9. Hadoop之Shuffle机制详解

    Hadoop之Shuffle机制详解 目录 Shuffle机制 Partition分区 WritableComparable排序 Combiner合并 GroupingComparator分组(辅助排 ...

最新文章

  1. ubuntu16.04安装mysql5.7.15
  2. 【百家稷学】深度学习与计算机视觉核心理论与实践(中国地质大学实训)
  3. python---------sys.argv的作用
  4. GC(垃圾处理机制)面试加薪必备
  5. asp.net core mvc 管道之中间件
  6. html5游戏作弊码,HTML5新手必备的入门指南秘籍
  7. foolegg126/gooflow - 码云 - 开源中国
  8. idea破解失败无法打开
  9. HDU 5857 Median
  10. java省略号_在Java中使用省略号(…)
  11. 免费wechat机器人教程
  12. 学计算机r7000和y7000哪个好,联想拯救者r7000p和y7000p哪个好-联想拯救者r7000p和y7000p评测对比...
  13. 《三体》死神永生之感
  14. 什么是异地双活及应用场景
  15. 美团CAT客户端接入方式
  16. 如何从量化的角度观看股票传统的技术指标
  17. kcl方程独立性的图论证明
  18. 阿里云服务器购买合同怎么申请
  19. 手机APP开发之MIT Appinventor详细实战教程(十),标准登陆界面的逻辑设计和数据库的有效使用
  20. java古诗_java抓取古诗文的单线程爬虫

热门文章

  1. Zookeeper详细介绍+dubbo简单介绍+简单大白话讲解
  2. 趣未来科技董事长黄婵娇:专注创新研发,把公司当做科研机构来运作!
  3. 图的应用(学校导游系统)
  4. 基础理论之永磁同步电机
  5. 【IJCV2021】 实用人脸关键点检测器PIPNet:快!准!稳!
  6. 微型计算机中鼠标属于什么设备,在微型计算机系统中,显示器、键盘、鼠标都属于输入...
  7. Idea的安装以及相关配置
  8. 【汇正财经】股票涨跌由哪些因素决定?决定股票涨跌的因素?
  9. 顾问成长 3 --当你是虾米时要长骨格
  10. HDU 3047 Zjnu Stadium