一。前述

Reduce文件会从Mapper任务中拉取很多小文件,小文件内部有序,但是整体是没序的,Reduce会合并小文件,然后套个归并算法,变成一个整体有序的文件。

二。代码

ReduceTask源码:

 public void run(JobConf job, final TaskUmbilicalProtocol umbilical)throws IOException, InterruptedException, ClassNotFoundException {job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());if (isMapOrReduce()) {copyPhase = getProgress().addPhase("copy");sortPhase  = getProgress().addPhase("sort");reducePhase = getProgress().addPhase("reduce");}// start thread that will handle communication with parentTaskReporter reporter = startReporter(umbilical);boolean useNewApi = job.getUseNewReducer();initialize(job, getJobID(), reporter, useNewApi);// check if it is a cleanupJobTaskif (jobCleanup) {runJobCleanupTask(umbilical, reporter);return;}if (jobSetup) {runJobSetupTask(umbilical, reporter);return;}if (taskCleanup) {runTaskCleanupTask(umbilical, reporter);return;}// Initialize the codeccodec = initCodec();RawKeyValueIterator rIter = null;ShuffleConsumerPlugin shuffleConsumerPlugin = null;Class combinerClass = conf.getCombinerClass();CombineOutputCollector combineCollector = (null != combinerClass) ? new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;Class<? extends ShuffleConsumerPlugin> clazz =job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, codec, combinerClass, combineCollector, spilledRecordsCounter, reduceCombineInputCounter,shuffledMapsCounter,reduceShuffleBytes, failedShuffleCounter,mergedMapOutputsCounter,taskStatus, copyPhase, sortPhase, this,mapOutputFile, localMapFiles);shuffleConsumerPlugin.init(shuffleContext); rIter = shuffleConsumerPlugin.run();//按顺序迭代// free up the data structures
    mapOutputFilesOnDisk.clear();sortPhase.complete();                         // sort is complete
    setPhase(TaskStatus.Phase.REDUCE); statusUpdate(umbilical);Class keyClass = job.getMapOutputKeyClass();Class valueClass = job.getMapOutputValueClass();RawComparator comparator = job.getOutputValueGroupingComparator();//分组比较 对应解析源码1if (useNewApi) {runNewReducer(job, umbilical, reporter, rIter, comparator, //对应解析源码2keyClass, valueClass);} else {runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);}shuffleConsumerPlugin.close();done(umbilical, reporter);}源码1:分组比较器的源码public RawComparator getOutputValueGroupingComparator() {Class<? extends RawComparator> theClass = getClass(
      JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);//用户没有设置分组比较器的时候,用默认的if (theClass == null) {return getOutputKeyComparator();//对应解析源码1.1}return ReflectionUtils.newInstance(theClass, this);}

源码1.1排序比较器,当用户不设置的时候取排序比较器实现,此时如果用户配置排序比较器,用排序比较器,没有的话用默认的Key的比较器

  public RawComparator getOutputKeyComparator() {Class<? extends RawComparator> theClass = getClass(JobContext.KEY_COMPARATOR, null, RawComparator.class);if (theClass != null)return ReflectionUtils.newInstance(theClass, this);return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);}

总结:

在Map端是真正改变(调整)Key的顺序的,在Reduce端是不会真正改变(调整)拉过来的其顺序的,Reduce不会重新排序,Reduce端强依赖Map端的输出。

解析源码2:runNewReduce的实现

void runNewReducer(JobConf job,final TaskUmbilicalProtocol umbilical,final TaskReporter reporter,RawKeyValueIterator rIter,RawComparator<INKEY> comparator,Class<INKEY> keyClass,Class<INVALUE> valueClass) throws IOException,InterruptedException, ClassNotFoundException {// wrap value iterator to report progress.final RawKeyValueIterator rawIter = rIter;//真正的迭代器rIter = new RawKeyValueIterator() {public void close() throws IOException {rawIter.close();}public DataInputBuffer getKey() throws IOException {return rawIter.getKey();}public Progress getProgress() {return rawIter.getProgress();}public DataInputBuffer getValue() throws IOException {return rawIter.getValue();}public boolean next() throws IOException {boolean ret = rawIter.next();reporter.setProgress(rawIter.getProgress().getProgress());return ret;}};// make a task context so we can get the classesorg.apache.hadoop.mapreduce.TaskAttemptContext taskContext =new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,getTaskID(), reporter);// make a reducerorg.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getReducerClass(), job);org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);job.setBoolean("mapred.skip.on", isSkipping());job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());org.apache.hadoop.mapreduce.Reducer.Context   reducerContext = createReduceContext(reducer, job, getTaskID(), rIter, reduceInputKeyCounter, //构建上下文的时候把迭代器传进来reduceInputValueCounter, trackedRW,committer,reporter, comparator, keyClass,//比较器  解析源码2.1valueClass);try {    reducer.run(reducerContext);//构建完上下文之后运行Redude的Run方法 解析源码Reduce2.2} finally {trackedRW.close(reducerContext);}}

解析源码2.1: createReduceContext实现构建上下文的源码

public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,RawKeyValueIterator input, //把迭代器传给输入对象InputCounter inputKeyCounter,Counter inputValueCounter,RecordWriter<KEYOUT,VALUEOUT> output,OutputCommitter committer,StatusReporter reporter,RawComparator<KEYIN> comparator,Class<KEYIN> keyClass,Class<VALUEIN> valueClass) throws InterruptedException, IOException{super(conf, taskid, output, committer, reporter);this.input = input;this.inputKeyCounter = inputKeyCounter;this.inputValueCounter = inputValueCounter;this.comparator = comparator;this.serializationFactory = new SerializationFactory(conf);this.keyDeserializer = serializationFactory.getDeserializer(keyClass);this.keyDeserializer.open(buffer);this.valueDeserializer = serializationFactory.getDeserializer(valueClass);this.valueDeserializer.open(buffer);  hasMore = input.next();this.keyClass = keyClass;this.valueClass = valueClass;this.conf = conf;this.taskid = taskid;}/** Start processing next unique key. */  public boolean nextKey() throws IOException,InterruptedException {//实际上Reduce中run方法中的contect.netKey调用的逻辑    while (hasMore && nextKeyIsSame) {//第一次假 放空      nextKeyValue();    }    if (hasMore) {      if (inputKeyCounter != null) {        inputKeyCounter.increment(1);      }      return nextKeyValue();    } else {      return false;    }  }

  /**   * Advance to the next key/value pair.   */  @Override  public boolean nextKeyValue() throws IOException, InterruptedException {    if (!hasMore) {      key = null;      value = null;      return false;    }    firstValue = !nextKeyIsSame;    DataInputBuffer nextKey = input.getKey();    currentRawKey.set(nextKey.getData(), nextKey.getPosition(),                       nextKey.getLength() - nextKey.getPosition());    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());    key = keyDeserializer.deserialize(key);    DataInputBuffer nextVal = input.getValue();    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()        - nextVal.getPosition());    value = valueDeserializer.deserialize(value);

    currentKeyLength = nextKey.getLength() - nextKey.getPosition();    currentValueLength = nextVal.getLength() - nextVal.getPosition();

    if (isMarked) {      backupStore.write(nextKey, nextVal);    }

    hasMore = input.next();    if (hasMore) {      nextKey = input.getKey();      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,                                      currentRawKey.getLength(),                                     nextKey.getData(),                                     nextKey.getPosition(),                                     nextKey.getLength() - nextKey.getPosition()                                         ) == 0;//判断当前key和下一个Key是否相等。    } else {      nextKeyIsSame = false;    }    inputValueCounter.increment(1);    return true;  }

  public KEYIN getCurrentKey() {    return key;  }

  @Override  public VALUEIN getCurrentValue() {    return value;  }

解析源码2.2 Reduce

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;import java.util.Iterator;* public class IntSumReducer&lt;Key&gt; extends Reducer&lt;Key,IntWritable,*                                                 Key,IntWritable&gt; {*   private IntWritable result = new IntWritable();* *   public void reduce(Key key, Iterable&lt;IntWritable&gt; values,*                      Context context) throws IOException, InterruptedException {*     int sum = 0;*     for (IntWritable val : values) {*       sum += val.get();*     }*     result.set(sum);*     context.write(key, result);*   }* }* </pre></blockquote></p>* * @see Mapper* @see Partitioner*/
@Checkpointable
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {/*** The <code>Context</code> passed on to the {@link Reducer} implementations.*/public abstract class Context implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {}/*** Called once at the start of the task.*/protected void setup(Context context) throws IOException, InterruptedException {// NOTHING
  }/*** This method is called once for each key. Most applications will define* their reduce class by overriding this method. The default implementation* is an identity function.*/@SuppressWarnings("unchecked")protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {for(VALUEIN value: values) {context.write((KEYOUT) key, (VALUEOUT) value);}}/*** Called once at the end of the task.*/protected void cleanup(Context context) throws IOException, InterruptedException {// NOTHING
  }/*** Advanced application writers can use the * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to* control how the reduce task works.*/public void run(Context context) throws IOException, InterruptedException {setup(context);try {while (context.nextKey()) {//实际上在这一步里实际上调用了NextKeyValue的值更新了 hasmore,nextKeyisSame,Key,Value的值reduce(context.getCurrentKey(), context.getValues(), context);//解析源码2.2.1// If a back up store is used, reset itIterator<VALUEIN> iter = context.getValues().iterator();if(iter instanceof ReduceContext.ValueIterator) {((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        }}} finally {cleanup(context);}}
}

源码2.2.1context.getValues的最终实现是一个迭代器

protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {private boolean inReset = false;private boolean clearMarkFlag = false;@Overridepublic boolean hasNext() {try {if (inReset && backupStore.hasNext()) {return true;} } catch (Exception e) {e.printStackTrace();throw new RuntimeException("hasNext failed", e);}return firstValue || nextKeyIsSame;}@Overridepublic VALUEIN next() {if (inReset) {try {if (backupStore.hasNext()) {backupStore.next();DataInputBuffer next = backupStore.nextValue();buffer.reset(next.getData(), next.getPosition(), next.getLength()- next.getPosition());value = valueDeserializer.deserialize(value);return value;} else {inReset = false;backupStore.exitResetMode();if (clearMarkFlag) {clearMarkFlag = false;isMarked = false;}}} catch (IOException e) {e.printStackTrace();throw new RuntimeException("next value iterator failed", e);}} // if this is the first record, we don't need to advanceif (firstValue) {firstValue = false;return value;}// if this isn't the first record and the next key is different, they// can't advance it here.if (!nextKeyIsSame) {throw new NoSuchElementException("iterate past last value");}// otherwise, go to the next key/value pairtry {   nextKeyValue();//这个迭代器自身是没有数据的,在Next中调用的还是 nextKeyValue,在这个NextKeyValue中调用的是Input的输入数据return value;} catch (IOException ie) {throw new RuntimeException("next value iterator failed", ie);} catch (InterruptedException ie) {// this is bad, but we can't modify the exception list of java.utilthrow new RuntimeException("next value iterator interrupted", ie);        }}

总结:以上说明一个流程。Reduce会拉回一个数据集,然后封装一个迭代器,真迭代器,ReduceContext会基于这个迭代器给我们封装一个方法,其中包括NextKeyValue这个方法,通过这个方法简介更新Key,Value的值,然后再Reduce方法的Run中有一个While循环,调用的是NextKey方法,底层调用的还是netxkeyValue方法,然后调用Reduce方法,传进去context.getCurrentKey(), context.getValues()两个方法,然后基于Value方法迭代,里面有HasNext和Next方法,Next方法实际上调用的还是真正的迭代器,最终数据时从镇迭代器中迭代出来的,在真正迭代器中有一个重要的标识NextKeyisSame,这个标识会被hasNext方法用到然后判断下一个key是否 相同,直到一组数据。

PS:补充一个知识点:

next调用的是NextKeyValue的方法,会把KeyValue真正改变,所以这块传的是引用传递。会改变同一块内存中的数据。

持续更新中。。。。,欢迎大家关注我的公众号LHWorld.

转载于:https://www.cnblogs.com/LHWorldBlog/p/8254143.html

Hadoop源码篇--Reduce篇相关推荐

  1. 深入理解Tomcat和Jetty源码之第二篇servlet规范和servlet容器

    深入理解Tomcat和Jetty源码之第二篇servlet规范和servlet容器 思维导图总览 这篇推送主要讲servlet的规范和什么是servlet容器? 1.先来讲讲servlet规范: 2. ...

  2. 深入理解Tomcat和Jetty源码之第一篇前奏知识

    深入理解Tomcat和Jetty源码之第一篇前奏知识 这周开始对Tomcat和Jetty的源码和框架产生了浓厚的兴趣,接下来想记录一下学习Tomcat和Jetty源码的心得和体会,有兴趣的朋友可以关注 ...

  3. swiper叠加轮播效果 (含源码) - 案例篇

    swiper叠加轮播效果(含源码) - 效果图 效果图: 图片素材来源于网络. [swiper 源码效果 · 对比地址] 代码: <!DOCTYPE html> <html lang ...

  4. swiper图片轮播(左中右) (含源码)- 案例篇

    swiper图片轮播(左中右) (含源码) - 案例篇 效果图: 代码: <!DOCTYPE html> <html lang="en"> <head ...

  5. asp.net mvc源码分析-Action篇 Action的执行

    接着上篇 asp.net mvc源码分析-Action篇 DefaultModelBinder 我们已经获取的了Action的参数,有前面的内容我们知道Action的调用时在ControllerAct ...

  6. asp.net mvc源码分析-Action篇 DefaultModelBinder

    接着上篇 asp.net mvc源码分析-Controller篇 ValueProvider 现在我们来看看ModelBindingContext这个对象. ModelBindingContext b ...

  7. jedis的源码理解-基础篇

    [jedis的源码理解-基础篇][http://my.oschina.net/u/944165/blog/127998] (关注实现关键功能的类) 基于jedis 2.2.0-SNAPSHOT 首先是 ...

  8. 开源中国源码学习UI篇(一)之FragmentTabHost的使用分析

    最近在有意读开源中国的源码来提升Android开发能力,开通博客来提升一下自己的积极性- -我参考的是开源中国2.2版,完整源码地址为http://git.oschina.net/oschina/an ...

  9. 开源中国源码学习UI篇(二)之NavigationDrawer+Fragment的使用分析

    前文链接:开源中国源码学习UI篇(一)之FragmentTabHost的使用分析 开源中国2.2版,完整源码地址为:http://git.oschina.net/oschina/android-app ...

  10. VUE源码学习第一篇--前言

    一.目的 前端技术的发展,现在以vue,react,angular为代表的MVVM模式以成为主流,这三个框架大有三分天下之势.react和angular有facebook与谷歌背书,而vue是以一己之 ...

最新文章

  1. Power BI for Office 365(八)共享查询
  2. 捡对象引流脚本 内容_对象和索引流
  3. 【渝粤教育】国家开放大学2018年春季 8634-22TAndroid智能手机编程 参考试题
  4. org.xml.sax.SAXParseException: The string -- is not permitted within comments.
  5. Python 实现单例模式
  6. 数学思想 —— 简化问题
  7. github安装python包_使用PyCharm从GitHub安装Python包
  8. 【愚公系列】2022年09月 微信小程序-电商项目-UI框架的选型
  9. 利用Python进行博客图片压缩
  10. 2020第十二届全国大学生数学竞赛初赛试题及答案(江苏非数)
  11. 图片画圈画箭头用什么软件_什么软件可以在编辑图片中画圈圈,如裁图时需要特..._网络编辑_帮考网...
  12. 【mininet 0x02】如何使用mn工具来操作mininet
  13. c语言20s计数器,计数器延时程序与级联程序的plc梯形图实现
  14. 四月份适合发微信朋友圈的文案
  15. 生成签名证书keystore
  16. 南方cass计算表面积_CASS-工程应用“计算表面积”
  17. TensorFlow学习--tf.summary.histogram与直方图仪表板/tensorboard_histograms
  18. 网易严选离线数仓质量建设实践
  19. 在matlab中配置vlfeat
  20. 对话框窗口过程与普通窗口过程的区别

热门文章

  1. Graphic系统综合练习案例-绘制饼状图
  2. HDU 1114 iggy-Bank(完全背包)
  3. iframe弹出层中关闭包含iframe的div(子页面调用父页面js函数)
  4. DropDownList如何绑定DataTable,如何绑定DataSet
  5. 【转】oracle内置函数 trunc 使用
  6. 阶乘、斐波那契数列(java版)
  7. 使用JVM钩子函数关闭资源
  8. Spring中的BeanDefinition
  9. 8 mv命令_Linux常用操作命令——文件和目录操作
  10. (七) UVC框架分析