文章目录

  • 前言
  • 流程图
  • Reduce都干了哪些事?
  • 源码分析
    • 1.run方法
      • 1.1 比较器getOutputValueGroupingComparator
        • 1.1.1 getOutputKeyComparator
      • 1.2runNewReducer方法
        • 1.2.1 createReduceContext方法
          • 1.2.1.1 ReduceContextImpl方法
            • 1.2.1.1.1 nextKey方法
            • 1.2.1.1-1 nextKeyValue方法
            • 1.2.1.1-2 getCurrentKey方法
            • 1.2.1.1-3 getValues方法
            • 1.2.1.1.2-1 ValueIterable
            • 1.2.1.1.2-2 内部类ValueIterator
  • 最后说:

前言

位置:hadoop-mapreduce-client-core-2.6.5.jar org.apache.hadoop.mapred下的ReduceTask
上一次我们分析了MapReduce的map-output流程(点击查看!),现在我们来分析reduce流程。

流程图

Reduce都干了哪些事?


1.shuffle
怎么拉取的数据先不管了,反正结果就是都拉回来了
2.sort(可以理解为归并排序)
reduce拉回来的小文件,也是内部有序、外部无序,要使用归并算法来进行排序。还有个SecondarySort,二次排序。它触发的是grouping comparator分组比较,也就是原语说的:相同的key为一组(reduce的分组强依赖map的排序,也就是排序有问题,分组一定有问题)。
3.Reduce
最后通过对“假迭代器”的迭代,完成数据的归约

源码分析

看一下Redecer.class的run方法源码:
run方法被调起的时候,会有setup、循环中调用reduce方法以及最后的cleanup方法。reduce的输入来源于上下文context的nextKey(),即是否有下一组。

 public void run(Context context) throws IOException, InterruptedException {setup(context);try {//循环调用reduce,有下一组就把K、V传进去。while (context.nextKey()) {reduce(context.getCurrentKey(), context.getValues(), context);// If a back up store is used, reset itIterator<VALUEIN> iter = context.getValues().iterator();if(iter instanceof ReduceContext.ValueIterator) {((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        }}} finally {cleanup(context);}}

想一下数据是怎么传的?因为数据量有可能很大,怎么规避内存溢出的问题的?container容器中反射ReduceTask类变成对象,然后通过这个对象调用ReduceTask的run方法。

1.run方法

调用的是ReduceTask的run方法,“三步走”策略:拷贝copyPhase 、排序sortPhase 、reduce归约阶段reducePhase 。然后经过shuffle拉取数据的插件跑完,就会得到从所有map端拉回来的多个内部有序、外部无序的小文件。这些小文件经过“归并排序”,就会得到一个迭代器rIter(即“真迭代器”,它就是reduce的数据输入源!)。当然也会有“假迭代器”,我们在自定义的MyReducer中的迭代器,就是这个“假迭代器”!

通过job.getOutputValueGroupingComparator()得到一个比较器——comparator 。在Map阶段也有这么一个比较器,是为快速排序准备的,所有叫排序比较器。Reduce阶段是用来分组的,叫分组比较器。比较器的结果就3种:小于0、等于0、大于0。等于0就证明是一组的,否则不同组。

@Override@SuppressWarnings("unchecked")public void run(JobConf job, final TaskUmbilicalProtocol umbilical)throws IOException, InterruptedException, ClassNotFoundException {job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());//“三步走”策略if (isMapOrReduce()) {copyPhase = getProgress().addPhase("copy");sortPhase  = getProgress().addPhase("sort");reducePhase = getProgress().addPhase("reduce");}//此处省略一万行代码......//迭代器引用,调用shuffle拉取的插件。只要run完,就证明shuffle已经拉完了,//完事就会得到一个迭代器被rIter引用,也就是所谓的“真迭代器”rIter = shuffleConsumerPlugin.run();// free up the data structuresmapOutputFilesOnDisk.clear();sortPhase.complete();                         // sort is completesetPhase(TaskStatus.Phase.REDUCE); statusUpdate(umbilical);Class keyClass = job.getMapOutputKeyClass();Class valueClass = job.getMapOutputValueClass();//比较器RawComparator comparator = job.getOutputValueGroupingComparator();if (useNewApi) {runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);} else {runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);}shuffleConsumerPlugin.close();done(umbilical, reporter);}

1.1 比较器getOutputValueGroupingComparator

如何取分组比较器?直接取,用户配置了,就取用户的。否则没有取到(null),就直接返回getOutputKeyComparator

public RawComparator getOutputValueGroupingComparator() {Class<? extends RawComparator> theClass = getClass(//看用户是否配置了,如果用户干预了,就直接通过K拿出VJobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);//否则用户未配置,就返回key的比较器if (theClass == null) {return getOutputKeyComparator();}return ReflectionUtils.newInstance(theClass, this);}

1.1.1 getOutputKeyComparator

用户没配置分组比较器,就进入这个方法了。看用户是否在Map阶段设置了排序比较器,如果有设置的排序比较器,就取排序比较器。假若用户连排序比较器都没设置,那就只能取key自己的排序比较器了。这个方法也就是Map端取排序比较器的方法,这里复用了。原因上面已经说过了,reduce的分组强依赖与map端的排序,map按照某规则排好序,reduce才好参考排序规则制定分组规则进行分组。所以也就是说,如果排序不正确,那么分组就一定有问题。
再往下,就是对我们使用的api进行条件判断,我们只关心runNewReducer。

 public RawComparator getOutputKeyComparator() {//如果能渠道排序比较器,那最好了Class<? extends RawComparator> theClass = getClass(JobContext.KEY_COMPARATOR, null, RawComparator.class);//排序比较器也没有,就只能用key自己的排序比较器了if (theClass != null)return ReflectionUtils.newInstance(theClass, this);return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);}

1.2runNewReducer方法

job、真迭代器rIter和比较器comparator都被当做参数传了进来,并对真迭代器rIter进行了包装,然后准备了task的上下文——taskContext 、调用createReduceContext方法传参(reducer对象、rIter及comparator)生成reducer上下文——reducerContext

@SuppressWarnings("unchecked")private <INKEY,INVALUE,OUTKEY,OUTVALUE>void runNewReducer(JobConf job,final TaskUmbilicalProtocol umbilical,final TaskReporter reporter,RawKeyValueIterator rIter,RawComparator<INKEY> comparator,Class<INKEY> keyClass,Class<INVALUE> valueClass) throws IOException,InterruptedException, ClassNotFoundException {// wrap value iterator to report progress.final RawKeyValueIterator rawIter = rIter;rIter = new RawKeyValueIterator() {public void close() throws IOException {rawIter.close();}public DataInputBuffer getKey() throws IOException {return rawIter.getKey();}public Progress getProgress() {return rawIter.getProgress();}public DataInputBuffer getValue() throws IOException {return rawIter.getValue();}public boolean next() throws IOException {boolean ret = rawIter.next();reporter.setProgress(rawIter.getProgress().getProgress());return ret;}};// 准备了任务上下文org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,getTaskID(), reporter);//准备reduce对象org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getReducerClass(), job);//reduce输出org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);job.setBoolean("mapred.skip.on", isSkipping());job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());//准备了reducer的上下文reducerContext (通过调用createReduceContext方法,传参reducer、rIter、comparator)org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, getTaskID(),rIter, reduceInputKeyCounter, reduceInputValueCounter, trackedRW,committer,reporter, comparator, keyClass,valueClass);try {reducer.run(reducerContext);} finally {trackedRW.close(reducerContext);}}

1.2.1 createReduceContext方法

任务就是生成reducer上下文——reducerContext。该方法只不过就是个皮囊,核心还是new了一个ReduceContextImpl,把我们传入该方法的参数reducer、rIter、comparator等传入了ReduceContextImpl这个方法中。然后ReduceContextImpl方法负责生成reduceContext,并调用本方法又new了一个WrappedReducer,将reduceContext传入,生成reducerContext对象直接return

@SuppressWarnings("unchecked")protected static <INKEY,INVALUE,OUTKEY,OUTVALUE> org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.ContextcreateReduceContext(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,Configuration job,org.apache.hadoop.mapreduce.TaskAttemptID taskId, RawKeyValueIterator rIter,org.apache.hadoop.mapreduce.Counter inputKeyCounter,org.apache.hadoop.mapreduce.Counter inputValueCounter,org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output, org.apache.hadoop.mapreduce.OutputCommitter committer,org.apache.hadoop.mapreduce.StatusReporter reporter,RawComparator<INKEY> comparator,Class<INKEY> keyClass, Class<INVALUE> valueClass) throws IOException, InterruptedException {//得到reduceContext才是目的org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> reduceContext = new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass);//将reduceContext当做参数传过来,生成reducerContext对象,直接returnorg.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context reducerContext = new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(reduceContext);return reducerContext;}
1.2.1.1 ReduceContextImpl方法

负责生成reduceContext对象,“真迭代器”rIter传入该构造方法,名字变成了RawKeyValueIterator input,也就是reduce端的输入源。随之传入进来的参数,通过this.xxx=xxx;来提升作用域。既然提升了input的作用域,就证明它就一定在本类的其他地方会被用到。在Reducer.class类中,我们一般会调用context.nextKey()方法、context.getCurrentKey()方法、context.getValues()这3个方法。因此上下文中就一定有这3个方法,换句话说,ReduceContextImpl中就一定会有这3个方法。在reduce端只要任务启动完了,拉完数据了,真迭代器也准备就绪了,下面就该处理数据了。然后就该调用Reducer.class的run方法,run方法一跑起来,就会调用context.nextKey()方法、context.getCurrentKey()方法、context.getValues()这3个方法。

public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,RawKeyValueIterator input, Counter inputKeyCounter,Counter inputValueCounter,RecordWriter<KEYOUT,VALUEOUT> output,OutputCommitter committer,StatusReporter reporter,RawComparator<KEYIN> comparator,Class<KEYIN> keyClass,Class<VALUEIN> valueClass) throws InterruptedException, IOException{super(conf, taskid, output, committer, reporter);this.input = input;//真迭代器,改名了this.inputKeyCounter = inputKeyCounter;this.inputValueCounter = inputValueCounter;this.comparator = comparator;this.serializationFactory = new SerializationFactory(conf);this.keyDeserializer = serializationFactory.getDeserializer(keyClass);this.keyDeserializer.open(buffer);this.valueDeserializer = serializationFactory.getDeserializer(valueClass);this.valueDeserializer.open(buffer);hasMore = input.next();//初始肯定有数据,因为数据刚拉来,还没发起计算this.keyClass = keyClass;this.valueClass = valueClass;this.conf = conf;this.taskid = taskid;}
1.2.1.1.1 nextKey方法

本方法先做了一个判断:是否还有数据&&下一组的key是否和我一样。其中hashMore的初始值,来源于ReduceContextImpl类的构造方法: hasMore = input.next();这里肯定有数据,因为迭代器刚刚才拉来的数据,还并未开始计算。关键就是nextKeyIsSame的初始值定义的就是false。

public boolean nextKey() throws IOException,InterruptedException {//是否还有数据&&下一组key是否和我一样//第一次进来,前面true,后面false,整体就是false,不执行while (hasMore && nextKeyIsSame) {nextKeyValue();}if (hasMore) {if (inputKeyCounter != null) {inputKeyCounter.increment(1);}return nextKeyValue();} else {return false;}}
1.2.1.1-1 nextKeyValue方法

上来就先判断hasMore,!hasMore=false,if不执行。之后,该方法就会做2件大事:

  • 1.map阶段会把输出的K、V序列化为字节数组,存在于文件中。reduce把这些文件拉回来了,要想用这些K、V,就必须反序列化。调用该方法,就是对k、v反序列化,并赋值。
  • 2.反序列化得到K、V之后,就又从真迭代器中再取,看是否还有数据。然后就比较这2个key是否相等,并将结果更新给nextKeyIsSame

这两件事做完了,只要nextKeyIsSame 赋值成功了,就会在Reducer的while循环中调用reduce方法。事情发展到这一步,数据就已经根据key是否相同分好组了(nextKeyIsSame =true证明是一组),此刻的迭代器也就从“真迭代器”(rIter、input)变身为“假迭代器”了。调用reduce方法,传参是context.getCurrentKey()和context.getValues()。当然,这两个也是通过context调用的,也就一定能在ReduceContextImpl实现中找到对应的方法。

@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {//第一次进来,hasMore为true,取反就是false,不执行if (!hasMore) {key = null;value = null;return false;}//nextKeyIsSame初始值为false,因此firstValue为truefirstValue = !nextKeyIsSame;DataInputBuffer nextKey = input.getKey();currentRawKey.set(nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition());buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());//从input中得到序列化的key,然后通过反序列化得到最原始的keykey = keyDeserializer.deserialize(key);DataInputBuffer nextVal = input.getValue();buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()- nextVal.getPosition());//反序列化得到v,并赋值给valuevalue = valueDeserializer.deserialize(value);currentKeyLength = nextKey.getLength() - nextKey.getPosition();currentValueLength = nextVal.getLength() - nextVal.getPosition();if (isMarked) {backupStore.write(nextKey, nextVal);}hasMore = input.next();//看看除了刚才的kv数据,还有没别的数据了//如果有第二条,就拿出来。并使用分组比较器,比较上一次取出的key及//现在刚取出来的key,如果结果==0就证明它俩相等。最后将这2个key的比较结果更新给nextKeyIsSame if (hasMore) {nextKey = input.getKey();nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, currentRawKey.getLength(),nextKey.getData(),nextKey.getPosition(),nextKey.getLength() - nextKey.getPosition()) == 0;} else {nextKeyIsSame = false;}inputValueCounter.increment(1);return true;}
1.2.1.1-2 getCurrentKey方法

直接把key返回了。

 public KEYIN getCurrentKey() {return key;}
1.2.1.1-3 getValues方法

直接返回的是迭代器——iterable。而iterable是通过new的内部类ValueIterable ,iterable = new ValueIterable();

 public Iterable<VALUEIN> getValues() throws IOException, InterruptedException {return iterable;}
1.2.1.1.2-1 ValueIterable

iterable是通过new的内部类ValueIterable 生成的

 protected class ValueIterable implements Iterable<VALUEIN> {private ValueIterator iterator = new ValueIterator();@Overridepublic Iterator<VALUEIN> iterator() {return iterator;} }
1.2.1.1.2-2 内部类ValueIterator

这个内部类有点长,就用伪代码表示了。。。通过new ValueIterator,返回的iterator,供ValueIterable使用

protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {//是否有下一跳public boolean hasNext() {}//用来取值public VALUEIN next() {}
}

最后说:

我们自定义的MyMapper中得到的是“假迭代器”,之所以说是“假迭代器”,是因为这个迭代器中的数据不是全部拉取来的,而是分组处理后得来的。每当调用nextKeyValue方法,就会更新nextKeyIsSame。只要nextKeyIsSame=true,就是一组的。当迭代到两组边缘的时候,nextKeyIsSame就会变成false,自然这一组数据就封了。然后“假迭代器”迭代第一组数据的动作就戛然而止了。

简单的总结一下,就是我读一行nextKeyValue,然后预判断第二行nextKeyIsSame。nextKeyIsSame=true就继续读第二行,预判断第三行。只要nextKeyIsSame=true,不管读到多少行,都是一组。第一组结束了,reduce方法就结束了。然后开始第二组迭代,再次调起reduce方法。

从源码角度分析MapReduce的reduce流程相关推荐

  1. 从源码角度分析MapReduce的map-output流程

    文章目录 前言 流程图 源码分析 1 runNewMapper方法 2.NewOutputCollector方法 2.1 createSortingCollector方法 2.1.1 collecto ...

  2. 【Android 插件化】Hook 插件化框架 ( 从源码角度分析加载资源流程 | Hook 点选择 | 资源冲突解决方案 )

    Android 插件化系列文章目录 [Android 插件化]插件化简介 ( 组件化与插件化 ) [Android 插件化]插件化原理 ( JVM 内存数据 | 类加载流程 ) [Android 插件 ...

  3. Mybatis底层原理学习(二):从源码角度分析一次查询操作过程

    在阅读这篇文章之前,建议先阅读一下我之前写的两篇文章,对理解这篇文章很有帮助,特别是Mybatis新手: 写给mybatis小白的入门指南 mybatis底层原理学习(一):SqlSessionFac ...

  4. 从源码的角度分析MapReduce的map-input流程

    前言 之前我们对MapReduce中Client提交Job作业的流程进行了源码分析(点击查看Client提交Job作业源码分析),今天我们来分析一下map-input阶段的源码. 源码位置 hadoo ...

  5. 从源码角度分析 Mybatis 工作原理

    作者:vivo互联网服务器团队-Zhang Peng 一.MyBatis 完整示例 这里,我将以一个入门级的示例来演示 MyBatis 是如何工作的. 注:本文后面章节中的原理.源码部分也将基于这个示 ...

  6. Hadoop(十二):从源码角度分析Hadoo是如何将作业提交给集群的

    为什么80%的码农都做不了架构师?>>>    一:MapReduce提交作业过程的流程图 通过图可知主要有三个部分,即: 1) JobClient:作业客户端. 2) JobTra ...

  7. 带你从源码角度分析ViewGroup中事件分发流程

    序言 这篇博文不是对事件分发机制全面的介绍,只是从源码的角度分析ACTION_DOWN.ACTION_MOVE.ACTION_UP事件在ViewGroup中的分发逻辑,了解各个事件在ViewGroup ...

  8. 从源码角度分析Android中的Binder机制的前因后果

    为什么在Android中使用binder通信机制? 众所周知linux中的进程通信有很多种方式,比如说管道.消息队列.socket机制等.socket我们再熟悉不过了,然而其作为一款通用的接口,通信开 ...

  9. 【易懂】Java源码角度分析put()与putIfAbsent()的区别——源码分析系列

    一.put()方法 1. 源码分析 Java中并未给出put()的源码,因此我们看一下put()方法中给出的注释: Associates the specified value with the sp ...

最新文章

  1. 计算机二进制加减符号,(带符号的二进制数的表示方法及加减法运算).ppt
  2. 160个Crackme036
  3. linux 挂起 移动电脑,linux 系统挂起
  4. VMware QueryPerformanceCounter/GetTickCount 悬案
  5. android 删除模拟器,android – 如何从avd设备中删除脱机模拟器?
  6. 如何在openGauss 2.1.0中使用Job?
  7. 常考的java数据库笔试题
  8. 关于debian网卡驱动
  9. matlab中进行太阳能电池模型,基于Matlab的光伏发电系统仿真研究
  10. mysql表名大小写设置
  11. Word | 图片被文字遮挡
  12. CAPM模型的应用--回归模型中的Alpha, r_f
  13. 在校园网的环境下用树莓派搭建私人云
  14. 【积跬步以至千里】Markdownpad2报错: Html Rendering Error:An error occurred with the HTML rendering component。
  15. 圆周率不用计算机怎么算,为何圆周率算了这么多年还没算完?就连超级计算机都“无可奈何”...
  16. 考试管理系统-刷题系统案题目选项编写
  17. 李建忠老师-设计模式
  18. 登陆id显示无法连接服务器失败,无法连接id服务器失败怎么办
  19. 委内瑞拉经济衰退导致通货膨胀
  20. Android 翻页效果 电子书 (转)

热门文章

  1. 数独基本规则_数独解题技巧总结
  2. 小米Max怎么刷入开发版获得root超级权限
  3. Exchange数据库无法装载的问题
  4. Redis系列-远程连接redis并给redis加锁
  5. 44.作用域,局部和全局变量
  6. MVC5+EF6 入门完整教程七
  7. Eclipse常见问题集锦
  8. 艾伟也谈项目管理,在团队中如何推行一项新的实践
  9. push and pop
  10. java-mybatis环境搭建