从源码角度分析MapReduce的reduce流程
文章目录
- 前言
- 流程图
- Reduce都干了哪些事?
- 源码分析
- 1.run方法
- 1.1 比较器getOutputValueGroupingComparator
- 1.1.1 getOutputKeyComparator
- 1.2runNewReducer方法
- 1.2.1 createReduceContext方法
- 1.2.1.1 ReduceContextImpl方法
- 1.2.1.1.1 nextKey方法
- 1.2.1.1-1 nextKeyValue方法
- 1.2.1.1-2 getCurrentKey方法
- 1.2.1.1-3 getValues方法
- 1.2.1.1.2-1 ValueIterable
- 1.2.1.1.2-2 内部类ValueIterator
- 最后说:
前言
位置:hadoop-mapreduce-client-core-2.6.5.jar org.apache.hadoop.mapred下的ReduceTask
上一次我们分析了MapReduce的map-output流程(点击查看!),现在我们来分析reduce流程。
流程图
Reduce都干了哪些事?
1.shuffle
怎么拉取的数据先不管了,反正结果就是都拉回来了
2.sort(可以理解为归并排序)
reduce拉回来的小文件,也是内部有序、外部无序,要使用归并算法来进行排序。还有个SecondarySort,二次排序。它触发的是grouping comparator分组比较,也就是原语说的:相同的key为一组(reduce的分组强依赖map的排序,也就是排序有问题,分组一定有问题)。
3.Reduce
最后通过对“假迭代器”的迭代,完成数据的归约
源码分析
看一下Redecer.class的run方法源码:
run方法被调起的时候,会有setup、循环中调用reduce方法以及最后的cleanup方法。reduce的输入来源于上下文context的nextKey(),即是否有下一组。
public void run(Context context) throws IOException, InterruptedException {setup(context);try {//循环调用reduce,有下一组就把K、V传进去。while (context.nextKey()) {reduce(context.getCurrentKey(), context.getValues(), context);// If a back up store is used, reset itIterator<VALUEIN> iter = context.getValues().iterator();if(iter instanceof ReduceContext.ValueIterator) {((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); }}} finally {cleanup(context);}}
想一下数据是怎么传的?因为数据量有可能很大,怎么规避内存溢出的问题的?container容器中反射ReduceTask类变成对象,然后通过这个对象调用ReduceTask的run方法。
1.run方法
调用的是ReduceTask的run方法,“三步走”策略:拷贝copyPhase 、排序sortPhase 、reduce归约阶段reducePhase 。然后经过shuffle拉取数据的插件跑完,就会得到从所有map端拉回来的多个内部有序、外部无序的小文件。这些小文件经过“归并排序”,就会得到一个迭代器rIter(即“真迭代器”,它就是reduce的数据输入源!)。当然也会有“假迭代器”,我们在自定义的MyReducer中的迭代器,就是这个“假迭代器”!
通过job.getOutputValueGroupingComparator()得到一个比较器——comparator 。在Map阶段也有这么一个比较器,是为快速排序准备的,所有叫排序比较器。Reduce阶段是用来分组的,叫分组比较器。比较器的结果就3种:小于0、等于0、大于0。等于0就证明是一组的,否则不同组。
@Override@SuppressWarnings("unchecked")public void run(JobConf job, final TaskUmbilicalProtocol umbilical)throws IOException, InterruptedException, ClassNotFoundException {job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());//“三步走”策略if (isMapOrReduce()) {copyPhase = getProgress().addPhase("copy");sortPhase = getProgress().addPhase("sort");reducePhase = getProgress().addPhase("reduce");}//此处省略一万行代码......//迭代器引用,调用shuffle拉取的插件。只要run完,就证明shuffle已经拉完了,//完事就会得到一个迭代器被rIter引用,也就是所谓的“真迭代器”rIter = shuffleConsumerPlugin.run();// free up the data structuresmapOutputFilesOnDisk.clear();sortPhase.complete(); // sort is completesetPhase(TaskStatus.Phase.REDUCE); statusUpdate(umbilical);Class keyClass = job.getMapOutputKeyClass();Class valueClass = job.getMapOutputValueClass();//比较器RawComparator comparator = job.getOutputValueGroupingComparator();if (useNewApi) {runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);} else {runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);}shuffleConsumerPlugin.close();done(umbilical, reporter);}
1.1 比较器getOutputValueGroupingComparator
如何取分组比较器?直接取,用户配置了,就取用户的。否则没有取到(null),就直接返回getOutputKeyComparator
public RawComparator getOutputValueGroupingComparator() {Class<? extends RawComparator> theClass = getClass(//看用户是否配置了,如果用户干预了,就直接通过K拿出VJobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);//否则用户未配置,就返回key的比较器if (theClass == null) {return getOutputKeyComparator();}return ReflectionUtils.newInstance(theClass, this);}
1.1.1 getOutputKeyComparator
用户没配置分组比较器,就进入这个方法了。看用户是否在Map阶段设置了排序比较器,如果有设置的排序比较器,就取排序比较器。假若用户连排序比较器都没设置,那就只能取key自己的排序比较器了。这个方法也就是Map端取排序比较器的方法,这里复用了。原因上面已经说过了,reduce的分组强依赖与map端的排序,map按照某规则排好序,reduce才好参考排序规则制定分组规则进行分组。所以也就是说,如果排序不正确,那么分组就一定有问题。
再往下,就是对我们使用的api进行条件判断,我们只关心runNewReducer。
public RawComparator getOutputKeyComparator() {//如果能渠道排序比较器,那最好了Class<? extends RawComparator> theClass = getClass(JobContext.KEY_COMPARATOR, null, RawComparator.class);//排序比较器也没有,就只能用key自己的排序比较器了if (theClass != null)return ReflectionUtils.newInstance(theClass, this);return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);}
1.2runNewReducer方法
job、真迭代器rIter和比较器comparator都被当做参数传了进来,并对真迭代器rIter进行了包装,然后准备了task的上下文——taskContext 、调用createReduceContext方法传参(reducer对象、rIter及comparator)生成reducer上下文——reducerContext
@SuppressWarnings("unchecked")private <INKEY,INVALUE,OUTKEY,OUTVALUE>void runNewReducer(JobConf job,final TaskUmbilicalProtocol umbilical,final TaskReporter reporter,RawKeyValueIterator rIter,RawComparator<INKEY> comparator,Class<INKEY> keyClass,Class<INVALUE> valueClass) throws IOException,InterruptedException, ClassNotFoundException {// wrap value iterator to report progress.final RawKeyValueIterator rawIter = rIter;rIter = new RawKeyValueIterator() {public void close() throws IOException {rawIter.close();}public DataInputBuffer getKey() throws IOException {return rawIter.getKey();}public Progress getProgress() {return rawIter.getProgress();}public DataInputBuffer getValue() throws IOException {return rawIter.getValue();}public boolean next() throws IOException {boolean ret = rawIter.next();reporter.setProgress(rawIter.getProgress().getProgress());return ret;}};// 准备了任务上下文org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,getTaskID(), reporter);//准备reduce对象org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getReducerClass(), job);//reduce输出org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);job.setBoolean("mapred.skip.on", isSkipping());job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());//准备了reducer的上下文reducerContext (通过调用createReduceContext方法,传参reducer、rIter、comparator)org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, getTaskID(),rIter, reduceInputKeyCounter, reduceInputValueCounter, trackedRW,committer,reporter, comparator, keyClass,valueClass);try {reducer.run(reducerContext);} finally {trackedRW.close(reducerContext);}}
1.2.1 createReduceContext方法
任务就是生成reducer上下文——reducerContext。该方法只不过就是个皮囊,核心还是new了一个ReduceContextImpl,把我们传入该方法的参数reducer、rIter、comparator等传入了ReduceContextImpl这个方法中。然后ReduceContextImpl方法负责生成reduceContext,并调用本方法又new了一个WrappedReducer,将reduceContext传入,生成reducerContext对象直接return
@SuppressWarnings("unchecked")protected static <INKEY,INVALUE,OUTKEY,OUTVALUE> org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.ContextcreateReduceContext(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,Configuration job,org.apache.hadoop.mapreduce.TaskAttemptID taskId, RawKeyValueIterator rIter,org.apache.hadoop.mapreduce.Counter inputKeyCounter,org.apache.hadoop.mapreduce.Counter inputValueCounter,org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output, org.apache.hadoop.mapreduce.OutputCommitter committer,org.apache.hadoop.mapreduce.StatusReporter reporter,RawComparator<INKEY> comparator,Class<INKEY> keyClass, Class<INVALUE> valueClass) throws IOException, InterruptedException {//得到reduceContext才是目的org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> reduceContext = new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass);//将reduceContext当做参数传过来,生成reducerContext对象,直接returnorg.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context reducerContext = new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(reduceContext);return reducerContext;}
1.2.1.1 ReduceContextImpl方法
负责生成reduceContext对象,“真迭代器”rIter传入该构造方法,名字变成了RawKeyValueIterator input,也就是reduce端的输入源。随之传入进来的参数,通过this.xxx=xxx;来提升作用域。既然提升了input的作用域,就证明它就一定在本类的其他地方会被用到。在Reducer.class类中,我们一般会调用context.nextKey()方法、context.getCurrentKey()方法、context.getValues()这3个方法。因此上下文中就一定有这3个方法,换句话说,ReduceContextImpl中就一定会有这3个方法。在reduce端只要任务启动完了,拉完数据了,真迭代器也准备就绪了,下面就该处理数据了。然后就该调用Reducer.class的run方法,run方法一跑起来,就会调用context.nextKey()方法、context.getCurrentKey()方法、context.getValues()这3个方法。
public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,RawKeyValueIterator input, Counter inputKeyCounter,Counter inputValueCounter,RecordWriter<KEYOUT,VALUEOUT> output,OutputCommitter committer,StatusReporter reporter,RawComparator<KEYIN> comparator,Class<KEYIN> keyClass,Class<VALUEIN> valueClass) throws InterruptedException, IOException{super(conf, taskid, output, committer, reporter);this.input = input;//真迭代器,改名了this.inputKeyCounter = inputKeyCounter;this.inputValueCounter = inputValueCounter;this.comparator = comparator;this.serializationFactory = new SerializationFactory(conf);this.keyDeserializer = serializationFactory.getDeserializer(keyClass);this.keyDeserializer.open(buffer);this.valueDeserializer = serializationFactory.getDeserializer(valueClass);this.valueDeserializer.open(buffer);hasMore = input.next();//初始肯定有数据,因为数据刚拉来,还没发起计算this.keyClass = keyClass;this.valueClass = valueClass;this.conf = conf;this.taskid = taskid;}
1.2.1.1.1 nextKey方法
本方法先做了一个判断:是否还有数据&&下一组的key是否和我一样。其中hashMore的初始值,来源于ReduceContextImpl类的构造方法: hasMore = input.next();这里肯定有数据,因为迭代器刚刚才拉来的数据,还并未开始计算。关键就是nextKeyIsSame的初始值定义的就是false。
public boolean nextKey() throws IOException,InterruptedException {//是否还有数据&&下一组key是否和我一样//第一次进来,前面true,后面false,整体就是false,不执行while (hasMore && nextKeyIsSame) {nextKeyValue();}if (hasMore) {if (inputKeyCounter != null) {inputKeyCounter.increment(1);}return nextKeyValue();} else {return false;}}
1.2.1.1-1 nextKeyValue方法
上来就先判断hasMore,!hasMore=false,if不执行。之后,该方法就会做2件大事:
- 1.map阶段会把输出的K、V序列化为字节数组,存在于文件中。reduce把这些文件拉回来了,要想用这些K、V,就必须反序列化。调用该方法,就是对k、v反序列化,并赋值。
- 2.反序列化得到K、V之后,就又从真迭代器中再取,看是否还有数据。然后就比较这2个key是否相等,并将结果更新给nextKeyIsSame
这两件事做完了,只要nextKeyIsSame 赋值成功了,就会在Reducer的while循环中调用reduce方法。事情发展到这一步,数据就已经根据key是否相同分好组了(nextKeyIsSame =true证明是一组),此刻的迭代器也就从“真迭代器”(rIter、input)变身为“假迭代器”了。调用reduce方法,传参是context.getCurrentKey()和context.getValues()。当然,这两个也是通过context调用的,也就一定能在ReduceContextImpl实现中找到对应的方法。
@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {//第一次进来,hasMore为true,取反就是false,不执行if (!hasMore) {key = null;value = null;return false;}//nextKeyIsSame初始值为false,因此firstValue为truefirstValue = !nextKeyIsSame;DataInputBuffer nextKey = input.getKey();currentRawKey.set(nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition());buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());//从input中得到序列化的key,然后通过反序列化得到最原始的keykey = keyDeserializer.deserialize(key);DataInputBuffer nextVal = input.getValue();buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()- nextVal.getPosition());//反序列化得到v,并赋值给valuevalue = valueDeserializer.deserialize(value);currentKeyLength = nextKey.getLength() - nextKey.getPosition();currentValueLength = nextVal.getLength() - nextVal.getPosition();if (isMarked) {backupStore.write(nextKey, nextVal);}hasMore = input.next();//看看除了刚才的kv数据,还有没别的数据了//如果有第二条,就拿出来。并使用分组比较器,比较上一次取出的key及//现在刚取出来的key,如果结果==0就证明它俩相等。最后将这2个key的比较结果更新给nextKeyIsSame if (hasMore) {nextKey = input.getKey();nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, currentRawKey.getLength(),nextKey.getData(),nextKey.getPosition(),nextKey.getLength() - nextKey.getPosition()) == 0;} else {nextKeyIsSame = false;}inputValueCounter.increment(1);return true;}
1.2.1.1-2 getCurrentKey方法
直接把key返回了。
public KEYIN getCurrentKey() {return key;}
1.2.1.1-3 getValues方法
直接返回的是迭代器——iterable。而iterable是通过new的内部类ValueIterable ,iterable = new ValueIterable();
public Iterable<VALUEIN> getValues() throws IOException, InterruptedException {return iterable;}
1.2.1.1.2-1 ValueIterable
iterable是通过new的内部类ValueIterable 生成的
protected class ValueIterable implements Iterable<VALUEIN> {private ValueIterator iterator = new ValueIterator();@Overridepublic Iterator<VALUEIN> iterator() {return iterator;} }
1.2.1.1.2-2 内部类ValueIterator
这个内部类有点长,就用伪代码表示了。。。通过new ValueIterator,返回的iterator,供ValueIterable使用
protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {//是否有下一跳public boolean hasNext() {}//用来取值public VALUEIN next() {}
}
最后说:
我们自定义的MyMapper中得到的是“假迭代器”,之所以说是“假迭代器”,是因为这个迭代器中的数据不是全部拉取来的,而是分组处理后得来的。每当调用nextKeyValue方法,就会更新nextKeyIsSame。只要nextKeyIsSame=true,就是一组的。当迭代到两组边缘的时候,nextKeyIsSame就会变成false,自然这一组数据就封了。然后“假迭代器”迭代第一组数据的动作就戛然而止了。
简单的总结一下,就是我读一行nextKeyValue,然后预判断第二行nextKeyIsSame。nextKeyIsSame=true就继续读第二行,预判断第三行。只要nextKeyIsSame=true,不管读到多少行,都是一组。第一组结束了,reduce方法就结束了。然后开始第二组迭代,再次调起reduce方法。
从源码角度分析MapReduce的reduce流程相关推荐
- 从源码角度分析MapReduce的map-output流程
文章目录 前言 流程图 源码分析 1 runNewMapper方法 2.NewOutputCollector方法 2.1 createSortingCollector方法 2.1.1 collecto ...
- 【Android 插件化】Hook 插件化框架 ( 从源码角度分析加载资源流程 | Hook 点选择 | 资源冲突解决方案 )
Android 插件化系列文章目录 [Android 插件化]插件化简介 ( 组件化与插件化 ) [Android 插件化]插件化原理 ( JVM 内存数据 | 类加载流程 ) [Android 插件 ...
- Mybatis底层原理学习(二):从源码角度分析一次查询操作过程
在阅读这篇文章之前,建议先阅读一下我之前写的两篇文章,对理解这篇文章很有帮助,特别是Mybatis新手: 写给mybatis小白的入门指南 mybatis底层原理学习(一):SqlSessionFac ...
- 从源码的角度分析MapReduce的map-input流程
前言 之前我们对MapReduce中Client提交Job作业的流程进行了源码分析(点击查看Client提交Job作业源码分析),今天我们来分析一下map-input阶段的源码. 源码位置 hadoo ...
- 从源码角度分析 Mybatis 工作原理
作者:vivo互联网服务器团队-Zhang Peng 一.MyBatis 完整示例 这里,我将以一个入门级的示例来演示 MyBatis 是如何工作的. 注:本文后面章节中的原理.源码部分也将基于这个示 ...
- Hadoop(十二):从源码角度分析Hadoo是如何将作业提交给集群的
为什么80%的码农都做不了架构师?>>> 一:MapReduce提交作业过程的流程图 通过图可知主要有三个部分,即: 1) JobClient:作业客户端. 2) JobTra ...
- 带你从源码角度分析ViewGroup中事件分发流程
序言 这篇博文不是对事件分发机制全面的介绍,只是从源码的角度分析ACTION_DOWN.ACTION_MOVE.ACTION_UP事件在ViewGroup中的分发逻辑,了解各个事件在ViewGroup ...
- 从源码角度分析Android中的Binder机制的前因后果
为什么在Android中使用binder通信机制? 众所周知linux中的进程通信有很多种方式,比如说管道.消息队列.socket机制等.socket我们再熟悉不过了,然而其作为一款通用的接口,通信开 ...
- 【易懂】Java源码角度分析put()与putIfAbsent()的区别——源码分析系列
一.put()方法 1. 源码分析 Java中并未给出put()的源码,因此我们看一下put()方法中给出的注释: Associates the specified value with the sp ...
最新文章
- 计算机二进制加减符号,(带符号的二进制数的表示方法及加减法运算).ppt
- 160个Crackme036
- linux 挂起 移动电脑,linux 系统挂起
- VMware QueryPerformanceCounter/GetTickCount 悬案
- android 删除模拟器,android – 如何从avd设备中删除脱机模拟器?
- 如何在openGauss 2.1.0中使用Job?
- 常考的java数据库笔试题
- 关于debian网卡驱动
- matlab中进行太阳能电池模型,基于Matlab的光伏发电系统仿真研究
- mysql表名大小写设置
- Word | 图片被文字遮挡
- CAPM模型的应用--回归模型中的Alpha, r_f
- 在校园网的环境下用树莓派搭建私人云
- 【积跬步以至千里】Markdownpad2报错: Html Rendering Error:An error occurred with the HTML rendering component。
- 圆周率不用计算机怎么算,为何圆周率算了这么多年还没算完?就连超级计算机都“无可奈何”...
- 考试管理系统-刷题系统案题目选项编写
- 李建忠老师-设计模式
- 登陆id显示无法连接服务器失败,无法连接id服务器失败怎么办
- 委内瑞拉经济衰退导致通货膨胀
- Android 翻页效果 电子书 (转)