Hadoop之Shuffle机制详解


目录

  1. Shuffle机制
  2. Partition分区
  3. WritableComparable排序
  4. Combiner合并
  5. GroupingComparator分组(辅助排序)

1. Shuffle机制

Mapreduce确保每个reducer的输入都是按key排序的。系统执行排序的过程(即将mapper输出作为输入传给reducer)称为shuffle,如下图所示


2. Partition分区

问题引出:要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

  1. 默认partition分区
public class HashPartitioner<K, V> extends Partitioner<K, V> {public int getPartition(K key, V value, int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}
}
默认分区是根据key的hashCode对reduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。

  1. 自定义Partitioner步骤

    1. 自定义类继承Partitioner,重写getPartition()方法
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {// 1 获取电话号码的前三位String preNum = key.toString().substring(0, 3);int partition = 4;// 2 判断是哪个省if ("136".equals(preNum)) {partition = 0;}else if ("137".equals(preNum)) {partition = 1;}else if ("138".equals(preNum)) {partition = 2;}else if ("139".equals(preNum)) {partition = 3;}return partition;}
}
  1. 在job驱动中,设置自定义partitioner:
job.setPartitionerClass(CustomPartitioner.class);
  1. 自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task
job.setNumReduceTasks(5);

注意

  1. 如果reduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;

  2. 如果1<reduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;

  3. 如果reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000;

    例如:假设自定义分区数为5,则
    (1)job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件
    (2)job.setNumReduceTasks(2);会报错
    (3)job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件


3. WritableComparable排序

排序是MapReduce框架中最重要的操作之一。Map Task和Reduce Task均会对数据(按照key)进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

对于Map Task,它会将处理的结果暂时放到一个缓冲区中,当缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次排序,并将这些有序数据写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行一次合并,以将这些文件合并成一个大的有序文件。

对于Reduce Task,它从每个Map Task上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则放到磁盘上,否则放到内存中。如果磁盘上文件数目达到一定阈值,则进行一次合并以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据写到磁盘上。当所有数据拷贝完毕后,Reduce Task统一对内存和磁盘上的所有数据进行一次合并。

每个阶段的默认排序

  1. 排序的分类

    1. 部分排序:区内排序——环形缓冲区
      MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。
    2. 全排序:
      如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。
      替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为上述文件创建3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。
    3. 辅助排序:(GroupingComparator分组)
      Mapreduce框架在记录到达reducer之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的map任务且这些map任务在不同轮次中完成时间各不相同。一般来说,大多数MapReduce程序会避免让reduce函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。
    4. 二次排序:
      在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
  2. 自定义排序WritableComparable
    (1)原理分析
    bean对象实现WritableComparable接口重写compareTo方法,就可以实现排序

@Override
public int compareTo(FlowBean o) {// 倒序排列,从大到小return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

4. Combiner合并

  1. combiner是MR程序中Mapper和Reducer之外的一种组件。

  2. combiner组件的父类就是Reducer。

  3. combiner和reducer的区别在于运行的位置:

    1. Combiner是在每一个maptask所在的节点运行;
    2. Reducer是接收全局所有Mapper的输出结果;
  4. combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量。

  5. combiner能够应用的前提是不能影响最终的业务逻辑,而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来。

  6. 自定义Combiner实现步骤:

    1. 自定义一个combiner继承Reducer,重写reduce方法
public class WordcountCombiner extends Reducer<Text, IntWritable, Text,
IntWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {// 1 汇总操作int count = 0;for(IntWritable v :values){count += v.get();}// 2 写出context.write(key, new IntWritable(count));}
}
  1. 在job驱动类中设置:
job.setCombinerClass(WordcountCombiner.class);

5. GroupingComparator分组(辅助排序)

对reduce阶段的数据根据某一个或几个字段进行分组。

Hadoop之Shuffle机制详解相关推荐

  1. Hadoop中RPC机制详解之Server端

    2019独角兽企业重金招聘Python工程师标准>>> Hadoop 中 RPC 机制详解之 Client 端 1. Server.Listener RPC Client 端的 RP ...

  2. shuffle机制详解

    将map输出作为输入传递给reducer的过程称为shuffle. Shuffle过程包含在Map和Reduce两端 map阶段大致过程为:   写数据,分区,排序,将属于同一分区的输出合并一起写在磁 ...

  3. Hadoop之Yarn工作机制详解

    Hadoop之Yarn工作机制详解 目录 Yarn概述 Yarn基本架构 Yarn工作机制 作业提交全过程详解 1. Yarn概述 Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于 ...

  4. Hadoop之NameNode和SecondaryNameNode工作机制详解

    Hadoop之NameNode和SecondaryNameNode工作机制详解 NN和2NN工作机制 NN和2NN工作机制详解 Fsimage和Edits解析 checkpoint时间设置 1. NN ...

  5. 【大数据day14】——MapReduce的运行机制详解(案列:Reduce 端实现 JOIN, Map端实现 JOIN,求共同好友)

    文章目录 1 .MapReduce的运行机制详解 1.1:MapTask 工作机制 详细步骤 配置 1.2 :ReduceTask 工作机制 详细步骤 1.3:Shuffle 过程 2. 案例: Re ...

  6. Spark 3.2.0 版本新特性 push-based shuffle 论文详解(一)概要和介绍

    前言 本文隶属于专栏<大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见大数据技术体系 目录 Spark 3.2.0 ...

  7. MapTask运行机制详解以及Map任务的并行度,ReduceTask 工作机制以及reduceTask的并行度,MapReduce总体工作机制

    MapTask运行机制详解 整个Map阶段流程大体如图所示 简单概述 inputFile通过split被逻辑切分为多个split文件, 通过Record按行读取内容给map(用户自己实现的)进行处理, ...

  8. hadoop作业初始化过程详解(源码分析第三篇)

    (一)概述 我们在上一篇blog已经详细的分析了一个作业从用户输入提交命令到到达JobTracker之前的各个过程.在作业到达JobTracker之后初始化之前,JobTracker会通过submit ...

  9. MapReduce:Shuffle过程详解

    MapReduce:Shuffle过程详解 1.Map任务处理 1.1 读取HDFS中的文件.每一行解析成一个<k,v>.每一个键值对调用一次map函数.                & ...

最新文章

  1. [纪录]仿IOS滚轮效果(竖直滑动选择器)
  2. DataList 外部事件获取DataList内部值
  3. React 学习总结
  4. posixkill php,在linux中使用PHP的posix_kill()会导致奇怪的行为
  5. Spring-jdbc-JdbcTemplate
  6. Scipy教程 - python数值计算库
  7. MySQL主从复制技术(纯干货)
  8. 管理感悟:掌握工作的决定权
  9. 计算机组成与体系结构之Flynn分类法
  10. 俯瞰开源工作流引擎Activiti
  11. 《Spring》AOP实现原理
  12. swfupload 无法加载_SwfUpload在IE10上不出现上传按钮的解决方法
  13. 数字信号中的各种频率
  14. 汉语言文学的毕业论文有创意的选题有哪些?推荐一下?
  15. 三消游戏(检查游戏是否死局)
  16. Spring Security定义多个过滤器链(10)
  17. 计算机无法自动排列,为什么我的电脑不能自动排列图标
  18. 01 A股10个月争取翻10倍实盘操作记录(前言)
  19. Go 语言优势、对比
  20. 快速入门mybatis(查询、添加日志、插入)

热门文章

  1. selenium webdirver之rdoc使用
  2. Spring的声明式事务管理
  3. Math.abs为Integer.Min_VALUE返回错误的值
  4. CentOS安装scrot记
  5. 牛客 - 仓库选址(中位数+思维)
  6. POJ - 3630 Phone List(字典树)
  7. html遮罩实例,给原生html中添加水印遮罩层的实现示例
  8. linux dns配置bind9,DNS服务(bind9)配置过程
  9. java groovyshell_在java中使用groovy怎么搞
  10. python import xlrd 报错_python读取excel(xlrd)