Hadoop源码篇--Reduce篇
一。前述
Reduce文件会从Mapper任务中拉取很多小文件,小文件内部有序,但是整体是没序的,Reduce会合并小文件,然后套个归并算法,变成一个整体有序的文件。
二。代码
ReduceTask源码:
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)throws IOException, InterruptedException, ClassNotFoundException {job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());if (isMapOrReduce()) {copyPhase = getProgress().addPhase("copy");sortPhase = getProgress().addPhase("sort");reducePhase = getProgress().addPhase("reduce");}// start thread that will handle communication with parentTaskReporter reporter = startReporter(umbilical);boolean useNewApi = job.getUseNewReducer();initialize(job, getJobID(), reporter, useNewApi);// check if it is a cleanupJobTaskif (jobCleanup) {runJobCleanupTask(umbilical, reporter);return;}if (jobSetup) {runJobSetupTask(umbilical, reporter);return;}if (taskCleanup) {runTaskCleanupTask(umbilical, reporter);return;}// Initialize the codeccodec = initCodec();RawKeyValueIterator rIter = null;ShuffleConsumerPlugin shuffleConsumerPlugin = null;Class combinerClass = conf.getCombinerClass();CombineOutputCollector combineCollector = (null != combinerClass) ? new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;Class<? extends ShuffleConsumerPlugin> clazz =job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, codec, combinerClass, combineCollector, spilledRecordsCounter, reduceCombineInputCounter,shuffledMapsCounter,reduceShuffleBytes, failedShuffleCounter,mergedMapOutputsCounter,taskStatus, copyPhase, sortPhase, this,mapOutputFile, localMapFiles);shuffleConsumerPlugin.init(shuffleContext); rIter = shuffleConsumerPlugin.run();//按顺序迭代// free up the data structures mapOutputFilesOnDisk.clear();sortPhase.complete(); // sort is complete setPhase(TaskStatus.Phase.REDUCE); statusUpdate(umbilical);Class keyClass = job.getMapOutputKeyClass();Class valueClass = job.getMapOutputValueClass();RawComparator comparator = job.getOutputValueGroupingComparator();//分组比较 对应解析源码1if (useNewApi) {runNewReducer(job, umbilical, reporter, rIter, comparator, //对应解析源码2keyClass, valueClass);} else {runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);}shuffleConsumerPlugin.close();done(umbilical, reporter);}源码1:分组比较器的源码public RawComparator getOutputValueGroupingComparator() {Class<? extends RawComparator> theClass = getClass( JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);//用户没有设置分组比较器的时候,用默认的if (theClass == null) {return getOutputKeyComparator();//对应解析源码1.1}return ReflectionUtils.newInstance(theClass, this);}
源码1.1排序比较器,当用户不设置的时候取排序比较器实现,此时如果用户配置排序比较器,用排序比较器,没有的话用默认的Key的比较器
public RawComparator getOutputKeyComparator() {Class<? extends RawComparator> theClass = getClass(JobContext.KEY_COMPARATOR, null, RawComparator.class);if (theClass != null)return ReflectionUtils.newInstance(theClass, this);return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);}
总结:
在Map端是真正改变(调整)Key的顺序的,在Reduce端是不会真正改变(调整)拉过来的其顺序的,Reduce不会重新排序,Reduce端强依赖Map端的输出。
解析源码2:runNewReduce的实现
void runNewReducer(JobConf job,final TaskUmbilicalProtocol umbilical,final TaskReporter reporter,RawKeyValueIterator rIter,RawComparator<INKEY> comparator,Class<INKEY> keyClass,Class<INVALUE> valueClass) throws IOException,InterruptedException, ClassNotFoundException {// wrap value iterator to report progress.final RawKeyValueIterator rawIter = rIter;//真正的迭代器rIter = new RawKeyValueIterator() {public void close() throws IOException {rawIter.close();}public DataInputBuffer getKey() throws IOException {return rawIter.getKey();}public Progress getProgress() {return rawIter.getProgress();}public DataInputBuffer getValue() throws IOException {return rawIter.getValue();}public boolean next() throws IOException {boolean ret = rawIter.next();reporter.setProgress(rawIter.getProgress().getProgress());return ret;}};// make a task context so we can get the classesorg.apache.hadoop.mapreduce.TaskAttemptContext taskContext =new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,getTaskID(), reporter);// make a reducerorg.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getReducerClass(), job);org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);job.setBoolean("mapred.skip.on", isSkipping());job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, getTaskID(), rIter, reduceInputKeyCounter, //构建上下文的时候把迭代器传进来reduceInputValueCounter, trackedRW,committer,reporter, comparator, keyClass,//比较器 解析源码2.1valueClass);try { reducer.run(reducerContext);//构建完上下文之后运行Redude的Run方法 解析源码Reduce2.2} finally {trackedRW.close(reducerContext);}}
解析源码2.1: createReduceContext实现构建上下文的源码
public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,RawKeyValueIterator input, //把迭代器传给输入对象InputCounter inputKeyCounter,Counter inputValueCounter,RecordWriter<KEYOUT,VALUEOUT> output,OutputCommitter committer,StatusReporter reporter,RawComparator<KEYIN> comparator,Class<KEYIN> keyClass,Class<VALUEIN> valueClass) throws InterruptedException, IOException{super(conf, taskid, output, committer, reporter);this.input = input;this.inputKeyCounter = inputKeyCounter;this.inputValueCounter = inputValueCounter;this.comparator = comparator;this.serializationFactory = new SerializationFactory(conf);this.keyDeserializer = serializationFactory.getDeserializer(keyClass);this.keyDeserializer.open(buffer);this.valueDeserializer = serializationFactory.getDeserializer(valueClass);this.valueDeserializer.open(buffer); hasMore = input.next();this.keyClass = keyClass;this.valueClass = valueClass;this.conf = conf;this.taskid = taskid;}/** Start processing next unique key. */ public boolean nextKey() throws IOException,InterruptedException {//实际上Reduce中run方法中的contect.netKey调用的逻辑 while (hasMore && nextKeyIsSame) {//第一次假 放空 nextKeyValue(); } if (hasMore) { if (inputKeyCounter != null) { inputKeyCounter.increment(1); } return nextKeyValue(); } else { return false; } } /** * Advance to the next key/value pair. */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!hasMore) { key = null; value = null; return false; } firstValue = !nextKeyIsSame; DataInputBuffer nextKey = input.getKey(); currentRawKey.set(nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition()); buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength()); key = keyDeserializer.deserialize(key); DataInputBuffer nextVal = input.getValue(); buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength() - nextVal.getPosition()); value = valueDeserializer.deserialize(value); currentKeyLength = nextKey.getLength() - nextKey.getPosition(); currentValueLength = nextVal.getLength() - nextVal.getPosition(); if (isMarked) { backupStore.write(nextKey, nextVal); } hasMore = input.next(); if (hasMore) { nextKey = input.getKey(); nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, currentRawKey.getLength(), nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition() ) == 0;//判断当前key和下一个Key是否相等。 } else { nextKeyIsSame = false; } inputValueCounter.increment(1); return true; } public KEYIN getCurrentKey() { return key; } @Override public VALUEIN getCurrentValue() { return value; }
解析源码2.2 Reduce
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;import java.util.Iterator;* public class IntSumReducer<Key> extends Reducer<Key,IntWritable,* Key,IntWritable> {* private IntWritable result = new IntWritable();* * public void reduce(Key key, Iterable<IntWritable> values,* Context context) throws IOException, InterruptedException {* int sum = 0;* for (IntWritable val : values) {* sum += val.get();* }* result.set(sum);* context.write(key, result);* }* }* </pre></blockquote></p>* * @see Mapper* @see Partitioner*/ @Checkpointable @InterfaceAudience.Public @InterfaceStability.Stable public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {/*** The <code>Context</code> passed on to the {@link Reducer} implementations.*/public abstract class Context implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {}/*** Called once at the start of the task.*/protected void setup(Context context) throws IOException, InterruptedException {// NOTHING }/*** This method is called once for each key. Most applications will define* their reduce class by overriding this method. The default implementation* is an identity function.*/@SuppressWarnings("unchecked")protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {for(VALUEIN value: values) {context.write((KEYOUT) key, (VALUEOUT) value);}}/*** Called once at the end of the task.*/protected void cleanup(Context context) throws IOException, InterruptedException {// NOTHING }/*** Advanced application writers can use the * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to* control how the reduce task works.*/public void run(Context context) throws IOException, InterruptedException {setup(context);try {while (context.nextKey()) {//实际上在这一步里实际上调用了NextKeyValue的值更新了 hasmore,nextKeyisSame,Key,Value的值reduce(context.getCurrentKey(), context.getValues(), context);//解析源码2.2.1// If a back up store is used, reset itIterator<VALUEIN> iter = context.getValues().iterator();if(iter instanceof ReduceContext.ValueIterator) {((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); }}} finally {cleanup(context);}} }
源码2.2.1context.getValues的最终实现是一个迭代器
protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {private boolean inReset = false;private boolean clearMarkFlag = false;@Overridepublic boolean hasNext() {try {if (inReset && backupStore.hasNext()) {return true;} } catch (Exception e) {e.printStackTrace();throw new RuntimeException("hasNext failed", e);}return firstValue || nextKeyIsSame;}@Overridepublic VALUEIN next() {if (inReset) {try {if (backupStore.hasNext()) {backupStore.next();DataInputBuffer next = backupStore.nextValue();buffer.reset(next.getData(), next.getPosition(), next.getLength()- next.getPosition());value = valueDeserializer.deserialize(value);return value;} else {inReset = false;backupStore.exitResetMode();if (clearMarkFlag) {clearMarkFlag = false;isMarked = false;}}} catch (IOException e) {e.printStackTrace();throw new RuntimeException("next value iterator failed", e);}} // if this is the first record, we don't need to advanceif (firstValue) {firstValue = false;return value;}// if this isn't the first record and the next key is different, they// can't advance it here.if (!nextKeyIsSame) {throw new NoSuchElementException("iterate past last value");}// otherwise, go to the next key/value pairtry { nextKeyValue();//这个迭代器自身是没有数据的,在Next中调用的还是 nextKeyValue,在这个NextKeyValue中调用的是Input的输入数据return value;} catch (IOException ie) {throw new RuntimeException("next value iterator failed", ie);} catch (InterruptedException ie) {// this is bad, but we can't modify the exception list of java.utilthrow new RuntimeException("next value iterator interrupted", ie); }}
总结:以上说明一个流程。Reduce会拉回一个数据集,然后封装一个迭代器,真迭代器,ReduceContext会基于这个迭代器给我们封装一个方法,其中包括NextKeyValue这个方法,通过这个方法简介更新Key,Value的值,然后再Reduce方法的Run中有一个While循环,调用的是NextKey方法,底层调用的还是netxkeyValue方法,然后调用Reduce方法,传进去context.getCurrentKey(), context.getValues()两个方法,然后基于Value方法迭代,里面有HasNext和Next方法,Next方法实际上调用的还是真正的迭代器,最终数据时从镇迭代器中迭代出来的,在真正迭代器中有一个重要的标识NextKeyisSame,这个标识会被hasNext方法用到然后判断下一个key是否 相同,直到一组数据。
PS:补充一个知识点:
next调用的是NextKeyValue的方法,会把KeyValue真正改变,所以这块传的是引用传递。会改变同一块内存中的数据。
持续更新中。。。。,欢迎大家关注我的公众号LHWorld.
转载于:https://www.cnblogs.com/LHWorldBlog/p/8254143.html
Hadoop源码篇--Reduce篇相关推荐
- 深入理解Tomcat和Jetty源码之第二篇servlet规范和servlet容器
深入理解Tomcat和Jetty源码之第二篇servlet规范和servlet容器 思维导图总览 这篇推送主要讲servlet的规范和什么是servlet容器? 1.先来讲讲servlet规范: 2. ...
- 深入理解Tomcat和Jetty源码之第一篇前奏知识
深入理解Tomcat和Jetty源码之第一篇前奏知识 这周开始对Tomcat和Jetty的源码和框架产生了浓厚的兴趣,接下来想记录一下学习Tomcat和Jetty源码的心得和体会,有兴趣的朋友可以关注 ...
- swiper叠加轮播效果 (含源码) - 案例篇
swiper叠加轮播效果(含源码) - 效果图 效果图: 图片素材来源于网络. [swiper 源码效果 · 对比地址] 代码: <!DOCTYPE html> <html lang ...
- swiper图片轮播(左中右) (含源码)- 案例篇
swiper图片轮播(左中右) (含源码) - 案例篇 效果图: 代码: <!DOCTYPE html> <html lang="en"> <head ...
- asp.net mvc源码分析-Action篇 Action的执行
接着上篇 asp.net mvc源码分析-Action篇 DefaultModelBinder 我们已经获取的了Action的参数,有前面的内容我们知道Action的调用时在ControllerAct ...
- asp.net mvc源码分析-Action篇 DefaultModelBinder
接着上篇 asp.net mvc源码分析-Controller篇 ValueProvider 现在我们来看看ModelBindingContext这个对象. ModelBindingContext b ...
- jedis的源码理解-基础篇
[jedis的源码理解-基础篇][http://my.oschina.net/u/944165/blog/127998] (关注实现关键功能的类) 基于jedis 2.2.0-SNAPSHOT 首先是 ...
- 开源中国源码学习UI篇(一)之FragmentTabHost的使用分析
最近在有意读开源中国的源码来提升Android开发能力,开通博客来提升一下自己的积极性- -我参考的是开源中国2.2版,完整源码地址为http://git.oschina.net/oschina/an ...
- 开源中国源码学习UI篇(二)之NavigationDrawer+Fragment的使用分析
前文链接:开源中国源码学习UI篇(一)之FragmentTabHost的使用分析 开源中国2.2版,完整源码地址为:http://git.oschina.net/oschina/android-app ...
- VUE源码学习第一篇--前言
一.目的 前端技术的发展,现在以vue,react,angular为代表的MVVM模式以成为主流,这三个框架大有三分天下之势.react和angular有facebook与谷歌背书,而vue是以一己之 ...
最新文章
- Power BI for Office 365(八)共享查询
- 捡对象引流脚本 内容_对象和索引流
- 【渝粤教育】国家开放大学2018年春季 8634-22TAndroid智能手机编程 参考试题
- org.xml.sax.SAXParseException: The string -- is not permitted within comments.
- Python 实现单例模式
- 数学思想 —— 简化问题
- github安装python包_使用PyCharm从GitHub安装Python包
- 【愚公系列】2022年09月 微信小程序-电商项目-UI框架的选型
- 利用Python进行博客图片压缩
- 2020第十二届全国大学生数学竞赛初赛试题及答案(江苏非数)
- 图片画圈画箭头用什么软件_什么软件可以在编辑图片中画圈圈,如裁图时需要特..._网络编辑_帮考网...
- 【mininet 0x02】如何使用mn工具来操作mininet
- c语言20s计数器,计数器延时程序与级联程序的plc梯形图实现
- 四月份适合发微信朋友圈的文案
- 生成签名证书keystore
- 南方cass计算表面积_CASS-工程应用“计算表面积”
- TensorFlow学习--tf.summary.histogram与直方图仪表板/tensorboard_histograms
- 网易严选离线数仓质量建设实践
- 在matlab中配置vlfeat
- 对话框窗口过程与普通窗口过程的区别