前言

之前我们对MapReduce中Client提交Job作业的流程进行了源码分析(点击查看Client提交Job作业源码分析),今天我们来分析一下map-input阶段的源码。

源码位置

hadoop部署包下的hadoop-mapreduce-client-core-2.6.5.jar ,org.apache.hadoop.mapred包下有2个要看的:maptask和reducetask

这里,先看一下大体流程:

1.Mapper类中的run()方法

从context中取出Key、Value给到map。

public void run(Context context) throws IOException, InterruptedException {setup(context);try {while (context.nextKeyValue()) {map(context.getCurrentKey(), context.getCurrentValue(), context);}} finally {cleanup(context);}}

1.1MapTask类中的run()方法:

MapTask中根本没有main方法,是被被容器反射变成对象,然后调自己的run()方法。

首先对配置信息中取出来的ReduceTask的数量进行判断,为0就把资源全部给Map。否则,就会有Map和Sort,且资源配比为2:1。当然,我们在Client提交作业之前,可以通过配置ReduceTask的数量:job.setNumReduceTasks(99);

 @Overridepublic void run(final JobConf job, final TaskUmbilicalProtocol umbilical)throws IOException, ClassNotFoundException, InterruptedException {this.umbilical = umbilical;if (isMapTask()) {// If there are no reducers then there won't be any sort. Hence the map // phase will govern the entire attempt's progress.//从配置信息中取出来的ReduceTask的数量如果为0,即没有Reduceif (conf.getNumReduceTasks() == 0) {mapPhase = getProgress().addPhase("map", 1.0f);} else {// If there are reducers then the entire attempt's progress will be // split between the map phase (67%) and the sort phase (33%).mapPhase = getProgress().addPhase("map", 0.667f);sortPhase  = getProgress().addPhase("sort", 0.333f);}}TaskReporter reporter = startReporter(umbilical);boolean useNewApi = job.getUseNewMapper();initialize(job, getJobID(), reporter, useNewApi);// check if it is a cleanupJobTaskif (jobCleanup) {runJobCleanupTask(umbilical, reporter);return;}if (jobSetup) {runJobSetupTask(umbilical, reporter);return;}if (taskCleanup) {runTaskCleanupTask(umbilical, reporter);return;}//判断是否使用新的apiif (useNewApi) {runNewMapper(job, splitMetaInfo, umbilical, reporter);} else {runOldMapper(job, splitMetaInfo, umbilical, reporter);}done(umbilical, reporter);}

2.RunNewMapper方法:

值得我们注意的是方法底部的try catch块。

map阶段大概要做的事就是:

  • 进行输入初始化,经历了输入初始化后,才会调用mapper的run方法(也就是Mapper类中的run方法)。
  • 框架最终是把Mapper中的run方法调起来了。run()方法执行完了,代表没有数据可处理了,于是就map阶段就结束了。
  • 然后input关闭,并置空。关闭output、置空(output的关闭里,包含了刷新)。
@SuppressWarnings("unchecked")private <INKEY,INVALUE,OUTKEY,OUTVALUE>void runNewMapper(final JobConf job,final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical,TaskReporter reporter) throws IOException, ClassNotFoundException,InterruptedException {// make a task context so we can get the classes//任务上下文taskContext,里面包装了job对象。Client把jar包提交到HDFS了,MapTask//跑起来后要把jar包、配置信息下载回本地的,因此此时job对象里的配置信息和我们当初//一通set的配置信息是一样的。org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(),reporter);// make a mapper//隐性理解为配置信息要取MapperClass,根据我们在配置中指定的自定义Mapper类,反射出具体的//Mapper类对象。换句话说,经过反射得到的Mapper对象,就是我们自定义Mapper对象org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);// make the input format//输入格式化,Client也用到了输入格式化FileInputFormat,用来计算切片清单org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);// rebuild the input split//重建切片(一个),一个split切片对应一个maptask。MapTask运行的时候,//它要把它的切片信息重构为一个对象,因为Client把切片信息序列化了,map环节//要反序列化才行org.apache.hadoop.mapreduce.InputSplit split = null;split = getSplitDetails(new Path(splitIndex.getSplitLocation()),splitIndex.getStartOffset());LOG.info("Processing split: " + split);//构造,new一个记录读取器,传参:切片、输入格式化类org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =new NewTrackingRecordReader<INKEY,INVALUE>(split, inputFormat, reporter, taskContext);job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());org.apache.hadoop.mapreduce.RecordWriter output = null;// get an output objectif (job.getNumReduceTasks() == 0) {output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);} else {output = new NewOutputCollector(taskContext, job, umbilical, reporter);}//构造mapContext,有了上下文org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split);org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(mapContext);try {//输入初始化input.initialize(split, mapperContext);//调用的是Mapper的run()方法,mapperContext作为参数给到run方法,run又给到map输出mapper.run(mapperContext);mapPhase.complete();setPhase(TaskStatus.Phase.SORT);statusUpdate(umbilical);//关闭输入、输出,并置空input.close();input = null;output.close(mapperContext);output = null;} finally {closeQuietly(input);closeQuietly(output, mapperContext);}}

2.1 getMapperClass()方法

如出一辙的方式,通过配置拿着Key去取Mapper,默认返回Mapper.class

 @SuppressWarnings("unchecked")public Class<? extends Mapper<?,?,?,?>> getMapperClass() throws ClassNotFoundException {return (Class<? extends Mapper<?,?,?,?>>) conf.getClass(MAP_CLASS_ATTR, Mapper.class);}

2.2 getInputFormatClass方法

废话不多说了,一模一样的方式,默认返回TextInputFormat.class

@SuppressWarnings("unchecked")public Class<? extends InputFormat<?,?>> getInputFormatClass() throws ClassNotFoundException {return (Class<? extends InputFormat<?,?>>) conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);}

2.3 NewTrackingRecordReader

构造出来的行记录读取器LineRecordReader被this.real引用了,既然被引用了,就一定会被用到。

这里面的几个方法,应该有几个特别眼熟了:getCurrentKey(),getCurrentValue(),nextKeyValue()。Mapper类的run()调用过这三个方法。Mapper.class中的run()的数据哪来的?就是从real拿的!

NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,TaskReporter reporter,org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)throws InterruptedException, IOException {this.reporter = reporter;this.inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);this.fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ);List <Statistics> matchedStats = null;if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split).getPath(), taskContext.getConfiguration());}fsStats = matchedStats;long bytesInPrev = getInputBytes(fsStats);//Client的输入格式化会计算split切片//Map端的输入格式化会给输入对象inputFormat创建一个行记录读取器this.real = inputFormat.createRecordReader(split, taskContext);long bytesInCurr = getInputBytes(fsStats);fileInputByteCounter.increment(bytesInCurr - bytesInPrev);}

real的类型为RecordReader:

2.3.1createRecordReader

输入格式化类inputFormat是TextInputFormat反射的出来的,所以找的时候,应该优先找TextInputFormat类下的createRecordReader方法。

Map端干活的就是行记录读取器LineRecordReader

@Overridepublic RecordReader<LongWritable, Text> createRecordReader(InputSplit split,TaskAttemptContext context) {String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");byte[] recordDelimiterBytes = null;if (null != delimiter)recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);//返回了行记录读取器return new LineRecordReader(recordDelimiterBytes);}

2.3.1.1 nextKeyValue

@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {long bytesInPrev = getInputBytes(fsStats);//这里的real指向的就是LineRecordReader对象boolean result = real.nextKeyValue();long bytesInCurr = getInputBytes(fsStats);if (result) {inputRecordCounter.increment(1);}fileInputByteCounter.increment(bytesInCurr - bytesInPrev);reporter.setProgress(getProgress());return result;}

2.4 MapContextImpl

传入input在这变成了RecordReader对象reader,且赋给了this.reader。既然他也有reader指向了RecordReader对象,那么它也有那几个熟悉的方法:
传进来的input是NewTrackingRecordReader对象,而NewTrackingRecordReader内部又包含LineRecordReader,说白了就是(mapContext上下文(NewTrackingRecordReader(LineRecordReader(三个方法:nextKeyValue、getCurrentKey、getCurrentValue))))层层包裹着。我调用mapContext的nextKeyValue,其实是调用的NewTrackingRecordReader的nextKeyValue,又进一步走的LinRecordReader的nextKeyValue,所以最终干活的就是LineRecordReader。

 public MapContextImpl(Configuration conf, TaskAttemptID taskid,RecordReader<KEYIN,VALUEIN> reader,RecordWriter<KEYOUT,VALUEOUT> writer,OutputCommitter committer,StatusReporter reporter,InputSplit split) {super(conf, taskid, writer, committer, reporter);this.reader = reader;this.split = split;}

2.5 initialize

对输入格式化类进行初始化,调用real.initialize方法,其实是对LineRecordReader初始化

@Overridepublic void initialize(org.apache.hadoop.mapreduce.InputSplit split,org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException, InterruptedException {long bytesInPrev = getInputBytes(fsStats);//借real,最终调用的LineRecordReader的initialize方法real.initialize(split, context);long bytesInCurr = getInputBytes(fsStats);fileInputByteCounter.increment(bytesInCurr - bytesInPrev);}

2.5.1 LineRecordReader的initialize方法

经过层层调用,最终到达的这,就是真实干活的地方了。在LineRecordReader这个类中,全都是真正干活的方法:
切片包含4个维度的东西:file、start、length、host。既然MapTask都跑起来了,证明计算向数据移动已经结束了。

该方法中,最为核心的就是:start += in.readLine(new Text(), 0, maxBytesToConsume(start));因为一个文件被切块是绝对按照字节数切,它根本就不考虑编码、单词等问题。所以“hello world”中“hel”被切到了Block1,“lo world”被切进了第二个块。默认splitSize=blockSize,所以切片也是如此。为了不破坏数据完整性,每个split切片在工作之前,初始化时,split的偏移量下移一行,所以Block2就不会处理第一行的“lo world”,应该交由Block1处理。然后对偏移量进行“修正”,并赋值给pos

public void initialize(InputSplit genericSplit,TaskAttemptContext context) throws IOException {FileSplit split = (FileSplit) genericSplit;Configuration job = context.getConfiguration();this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);//从切片中得到起始偏移量start = split.getStart();//起始偏移量+切片长度=结束的偏移量end = start + split.getLength();//从切片中得到隶属于哪个文件的路径final Path file = split.getPath();// open the file and seek to the start of the split//拿到分布式文件系统对象,job里有配置信息final FileSystem fs = file.getFileSystem(job);//打开文件,得到对程序的输入流,对文件的读取fileIn = fs.open(file);CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);if (null!=codec) {isCompressedInput = true;  decompressor = CodecPool.getDecompressor(codec);if (codec instanceof SplittableCompressionCodec) {final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, start, end,SplittableCompressionCodec.READ_MODE.BYBLOCK);in = new CompressedSplitLineReader(cIn, job,this.recordDelimiterBytes);start = cIn.getAdjustedStart();end = cIn.getAdjustedEnd();filePosition = cIn;} else {in = new SplitLineReader(codec.createInputStream(fileIn,decompressor), job, this.recordDelimiterBytes);filePosition = fileIn;}} else {//跳到自己切片的偏移量fileIn.seek(start);//得到SplitLineReader,真正能读数据的就是fileIn。经过如此包装,in也可以读了in = new UncompressedSplitLineReader(fileIn, job, this.recordDelimiterBytes, split.getLength());filePosition = fileIn;}// If this is not the first split, we always throw away first record// because we always (except the last split) read one extra line in// next() method.//只有第1个split偏移量是0,也就是说第1个split不会走这,别的split都走这if (start != 0) {//读一行,扔给new Text()的对象//readLine方法一调用,返回的是“读到了多少个字节”。承载的Text()对象是匿名的,//没有一个栈里的明确的变量引用,匿名就意味着上下文根本就取不到它的内容。我又想//读这一行,这一行又取不到,意味着这一行就被浪费了。//修正偏移量,起始位置就成了第二行的偏移量start += in.readLine(new Text(), 0, maxBytesToConsume(start));}this.pos = start;}

2.6 mapper.run(mapperContext)方法

调用的是Mapper的run()方法,mapperContext作为参数给到run方法,run又给到map输出。Mapper中的run()方法在while循环中判断context.nextKeyValue,而nextKeyValue是经过一层包装起来的,最终干活的还是LineRecordReader中的nextKeyValue方法。

2.6.1 LineRecordReader下的nextKeyValue方法

只要Mapper的run方法一调起,while(context.nextKeyValue)做的事情就是定义K-V类型并赋值。同时本身boolean判断。有值就返回true,没值了就返回false。

public boolean nextKeyValue() throws IOException {//MapTask一运行,key的类型就是LongWritable类型,原因就在这,因为key是偏移量if (key == null) {key = new LongWritable();}key.set(pos);//pos就是行偏移量//value是Text类型if (value == null) {value = new Text();}int newSize = 0;// We always read one extra line, which lies outside the upper// split limit i.e. (end - 1)while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {if (pos == 0) {newSize = skipUtfByteOrderMark();} else {//这读的就是第2行newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));pos += newSize;//对pos进行更新,下次循环再来一波readLine,读的就是第3行的内容了}if ((newSize == 0) || (newSize < maxLineLength)) {break;}// line too long. try againLOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));}if (newSize == 0) {key = null;value = null;return false;} else {return true;}}

Mapper类中run方法被调用,while内部调用context.getCurrentKey和context.getCurrentValue,底层实际上调用的是LineRecordReader的getCurrentKey和getCurrentValue方法:

 @Overridepublic LongWritable getCurrentKey() {return key;}@Overridepublic Text getCurrentValue() {return value;}

返回key和value,来源就是调用nextKeyValue方法是,该方法早就把key和value准备好了。下面就可以直接传参、计算了。

从源码的角度分析MapReduce的map-input流程相关推荐

  1. 从Android源码的角度分析Binder机制

    IPC 为了弄懂IPC的来龙去脉,我将从以下三个方面为大家来讲解,希望对大家理解IPC会有帮助 什么是IPC IPC是Inter Process Communication的缩写,其意思就是进程间的通 ...

  2. 带你从源码角度分析ViewGroup中事件分发流程

    序言 这篇博文不是对事件分发机制全面的介绍,只是从源码的角度分析ACTION_DOWN.ACTION_MOVE.ACTION_UP事件在ViewGroup中的分发逻辑,了解各个事件在ViewGroup ...

  3. 【转】Android事件分发机制完全解析,带你从源码的角度彻底理解(下)

    转载请注明出处:http://blog.csdn.net/guolin_blog/article/details/9153761 记得在前面的文章中,我带大家一起从源码的角度分析了Android中Vi ...

  4. Android事件分发机制完全解析,带你从源码的角度彻底理解(上)

    <div id="container">         <div id="header">     <div class=&qu ...

  5. [学习总结]7、Android AsyncTask完全解析,带你从源码的角度彻底理解

    我们都知道,Android UI是线程不安全的,如果想要在子线程里进行UI操作,就需要借助Android的异步消息处理机制.之前我也写过了一篇文章从源码层面分析了Android的异步消息处理机制,感兴 ...

  6. 从 Android 6.0 源码的角度剖析 Binder 工作原理 | CSDN 博文精选

    在从Android 6.0源码的角度剖析Activity的启动过程一文(https://blog.csdn.net/AndrExpert/article/details/81488503)中,我们了解 ...

  7. Android Fragment 从源码的角度去解析(上)

    ###1.概述 本来想着昨天星期五可以早点休息,今天可以早点起来跑步,可没想到事情那么的多,晚上有人问我主页怎么做到点击才去加载Fragment数据,而不是一进入主页就去加载所有的数据,在这里自己就对 ...

  8. 从Android 6.0源码的角度剖析View的绘制原理

    在从Android 6.0源码的角度剖析Activity的启动过程和从Android 6.0源码的角度剖析Window内部机制原理的文章中,我们分别详细地阐述了一个界面(Activity)从启动到显示 ...

  9. JAVA源码优化、分析工具

    JAVA源码优化.分析工具 一.11款用于优化.分析源代码的Java工具 1. PMD from http://pmd.sourceforge.net/ PMD能够扫描Java 源代码,查找类似以下的 ...

最新文章

  1. 六年磨一剑,全时发布音视频会议平台TANG,多款新品亮相
  2. 基本的输入输出函数介绍
  3. 解决 Android ping IPv6 地址显示 network is unreachable 的问题
  4. 偶尔用得上的MySQL操作
  5. 【word2vec】篇三:基于Negative Sampling 的 CBOW 模型和 Skip-gram 模型
  6. linux img 内核启动,linux的启动流程(initrd.img)
  7. 引用js_js值和引用
  8. 每天干的啥?(2019.3)
  9. 一个普通买房者亲历的房价跳涨事件
  10. C 语言结构体成员赋值的深拷贝和浅拷贝
  11. random.choice与random.choices
  12. 3D数字孪生大屏怎么做?你需要了解这款数据可视化软件
  13. ENVI--气象及环境卫星数据处理
  14. 贝叶斯决策论(一):贝叶斯决策理论
  15. texlive2021
  16. 双碑零基础法语学习 学习法语要知道哪些法语常识?
  17. html屏幕缩小图片不失真,html图片失真怎么办
  18. java中所有import意思,java 程序中用到的所有类都必须使用 import 语句。
  19. CSAPP导读第3章 程序的机器级表示
  20. 参加第一场多校大一训练赛后的感想

热门文章

  1. 7 centos 设置jvmgc_centos7配置java环境变量
  2. js获取当前日期_vue项目中获取前后N天日期
  3. ArtemisMQ的“未消费之谜”
  4. Exchange数据库无法装载的问题
  5. Ubuntu架设FTP
  6. mysql 插入汉字 异常 Incorrect string value: '\xE8\xA7\x84\xE5\x88\x99' for column 'name'
  7. linux下screen工具使用
  8. netty tcp 字节有序-gt;对象有序
  9. 《MonkeyRunner原理剖析》第九章-MonkeyImage实现原理 - 概览
  10. Linux正变得无处不在;应用大盘点