Dream car 镇楼 ~ !

接上一节Input环节,接下来分析 output环节。代码在runNewMapper()方法中:

private <INKEY,INVALUE,OUTKEY,OUTVALUE>void runNewMapper(final JobConf job,final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical,TaskReporter reporter) {.......// 这个out也被包含在map的上下文当中了,所以在map方法中的输出,调用的是output的write方法org.apache.hadoop.mapreduce.RecordWriter output = null;// 记住这个数值  0 if (job.getNumReduceTasks() == 0) {  // 判断ReduceTask的数量output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);} else {    // > 0// 创建一个 Collector 对象  【看构造源码可以知道输出的时候是需要分区的】output = new NewOutputCollector(taskContext, job, umbilical, reporter);}//  -----------new NewOutputCollector() begin ------------------NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,JobConf job,TaskUmbilicalProtocol umbilical,TaskReporter reporter) throws IOException, ClassNotFoundException {//1、 赋值操作。先不仔细看,跳过~  下一段说collector = createSortingCollector(job, reporter);// 2、有多少个reducetask 就有多少个分区// 回忆:一个分区可以有若干组,相同的key为一组partitions = jobContext.getNumReduceTasks();if (partitions > 1) {partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)// 常见套路:反射生成实例对象,如果有自定义分区器,则不使用默认的// 默认的分区算法是简单的hash取模,会保证相同的key在一组ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);} else {  // reducetask = 1,所有的组都会进入一个分区partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {// 返回分区号,返回的值固定为 0public int getPartition(K key, V value, int numPartitions) {return partitions - 1;}};}}
//  -----------new NewOutputCollector()  end ------------------//  -----------write(K key, V value) begin ------------------// output往外写的时候带着 (k v p)  三元组       public void write(K key, V value) throws IOException, InterruptedException {collector.collect(key, value,partitioner.getPartition(key, value, partitions));
//  -----------write(K key, V value) end --------------------..............                          }

createSortingCollector(job, reporter)方法进去:

private <KEY, VALUE> MapOutputCollector<KEY, VALUE>createSortingCollector(JobConf job, TaskReporter reporter)throws IOException, ClassNotFoundException {// 反射创建collector实例MapOutputCollector<KEY, VALUE> collector= (MapOutputCollector<KEY, VALUE>)// 常见套路:如果没有用户自定义collector,那么就取默认的ReflectionUtils.newInstance(job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,// MapOutputBuffer 这玩意牛逼,后边再说。MapOutputBuffer.class, MapOutputCollector.class), job);MapOutputCollector.Context context =new MapOutputCollector.Context(this, job, reporter);// 初始化的就是 MapOutputBuffer,真正要使用它之前要初始化。// 重要方法,下段分析collector.init(context);return collector;}

重头戏了,进入初始化环节:collector.init(context) ,删除非核心代码,清清爽爽开开心心读源码 ~

    public void init(MapOutputCollector.Context context)  {// 0.随便看看job = context.getJobConf();reporter = context.getReporter();mapTask = context.getMapTask();mapOutputFile = mapTask.getMapOutputFile();sortPhase = mapTask.getSortPhase();spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);partitions = job.getNumReduceTasks();rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();// 1.溢写的阈值 0.8 , 剩下的 0.2 空间还可以继续使用final float spillper =job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);// 2.缓冲区的默认大小final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,INDEX_CACHE_MEMORY_LIMIT_DEFAULT);// 3. 排序器:如果没有自定义,就使用默认的快排算法// 排序的本质就是在做比较:字典序或者数值序,所以排序器要用到【比较器】后边会说sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",QuickSort.class, IndexedSorter.class), job);//--------------------这可就是大名鼎鼎的环形缓冲区,真™牛X的设计---------------int maxMemUsage = sortmb << 20;maxMemUsage -= maxMemUsage % METASIZE;kvbuffer = new byte[maxMemUsage];bufvoid = kvbuffer.length;kvmeta = ByteBuffer.wrap(kvbuffer).order(ByteOrder.nativeOrder()).asIntBuffer();setEquator(0);bufstart = bufend = bufindex = equator;kvstart = kvend = kvindex;maxRec = kvmeta.capacity() / NMETA;softLimit = (int)(kvbuffer.length * spillper);bufferRemaining = softLimit;//--------------------------------------------------------------------// k/v serialization// 4.获取【比较器】进行排序。如果没有自定义,就使用默认的。// key 类型都是Hadoop封装的可序列化类,自身都带比较器comparator = job.getOutputKeyComparator();.............// output counters.............// compression:数据压缩............// combiner:相同的key在map端做一次合并,减少reduce拉取的数据量.为我们提供了调优接口// 俗称:小reduce ,会在map端发生一次或多次. 之后的文章会介绍这个源码.............// 4. 溢写线程 // 当环形缓冲区的占用到80%,将缓冲区中的数据写入到磁盘// 此时的缓冲区是多个线程共享的:有线程在往磁盘写,有线程在往缓冲区写// 怎样防止读写线程碰撞?答:反向写数据到缓冲区spillInProgress = false;minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);spillThread.setDaemon(true);spillThread.setName("SpillThread");spillLock.lock();try {spillThread.start();while (!spillThreadRunning) {spillDone.await();}} catch (InterruptedException e) {} finally {spillLock.unlock();}}

后边源码也没必要一行行看了,直接文字总结描述了

MapOutBuffer:

map 输出的K-V会被序列化成字节数组,计算出分区号,最终是三元组<k,v,p>

buffer 是map过程使用到的环形缓冲区:

  • 本质是字节数组;
  • 赤道:两端分别存放K-V,索引;
  • 索引:对K-V的索引,固定长度16B,4个int:分区号P,K的偏移量,V的偏移量,V的数据长度;
  • 数据填充到缓冲区的阈值 80% 时,启动溢写线程;
  • 快速排序 80%的数据,同时Map输出的线程向缓冲区的剩余部分写入;
  • 快速排序的过程,比较的是key,但是移动的是索引;
  • 溢写时只要排序后的索引,溢出数据就是有序的;

注意:排序是二次排序:

  • 分区有序:reduce拉取数据是按照分区拉取;
  • 分区内key 有序:因为reduce计算是按照分组计算;

调优:在溢写过程中会发生combiner

  • 其实就是一个 map 里的reduce,按照组进行统计;
  • 发生时间点:排序之后相同的key放在一起了,开始combiner,然后溢写;
  • minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3),最终map结束输出过程buffer会溢出多个小文件,当文件的个数达到3个时,map会把小文件合并,避免文件的碎片化【小文件问题,后边还会提及】

附 溢写线程相关源码:

protected class SpillThread extends Thread {@Overridepublic void run() {spillLock.lock();spillThreadRunning = true;try {while (true) {spillDone.signal();while (!spillInProgress) {spillReady.await();}try {spillLock.unlock();// 排序并溢写会被调用sortAndSpill();} catch (Throwable t) {sortSpillException = t;} finally {spillLock.lock();if (bufend < bufstart) {bufvoid = kvbuffer.length;}kvstart = kvend;bufstart = bufend;spillInProgress = false;}}} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {spillLock.unlock();spillThreadRunning = false;}}}

sortAndSpill()

private void sortAndSpill() throws IOException, ClassNotFoundException,InterruptedException {//approximate the length of the output file to be the length of the//buffer + header lengths for the partitionsfinal long size = (bufend >= bufstart? bufend - bufstart: (bufvoid - bufend) + bufstart) +partitions * APPROX_HEADER_LENGTH;FSDataOutputStream out = null;try {// create spill filefinal SpillRecord spillRec = new SpillRecord(partitions);final Path filename =mapOutputFile.getSpillFileForWrite(numSpills, size);out = rfs.create(filename);final int mstart = kvend / NMETA;final int mend = 1 + // kvend is a valid record(kvstart >= kvend? kvstart: kvmeta.capacity() + kvstart) / NMETA;sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);int spindex = mstart;final IndexRecord rec = new IndexRecord();final InMemValBytes value = new InMemValBytes();for (int i = 0; i < partitions; ++i) {IFile.Writer<K, V> writer = null;try {long segmentStart = out.getPos();writer = new Writer<K, V>(job, out, keyClass, valClass, codec,spilledRecordsCounter);// 会调用combinerif (combinerRunner == null) {// spill directlyDataInputBuffer key = new DataInputBuffer();while (spindex < mend &&kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {final int kvoff = offsetFor(spindex % maxRec);int keystart = kvmeta.get(kvoff + KEYSTART);int valstart = kvmeta.get(kvoff + VALSTART);key.reset(kvbuffer, keystart, valstart - keystart);getVBytesForOffset(kvoff, value);writer.append(key, value);++spindex;}} else {int spstart = spindex;while (spindex < mend &&kvmeta.get(offsetFor(spindex % maxRec)+ PARTITION) == i) {++spindex;}// Note: we would like to avoid the combiner if we've fewer// than some threshold of records for a partitionif (spstart != spindex) {combineCollector.setWriter(writer);RawKeyValueIterator kvIter =new MRResultIterator(spstart, spindex);combinerRunner.combine(kvIter, combineCollector);}}

MapReduce —— MapTask阶段源码分析(Output环节)相关推荐

  1. 《MapReduce 2.0源码分析与编程实战》一第1章 HBase介绍

    本节书摘来异步社区<MapReduce 2.0源码分析与编程实战>一书中的第1章,作者: 王晓华 责编: 陈冀康,更多章节内容可以访问云栖社区"异步社区"公众号查看. ...

  2. 《MapReduce 2.0源码分析与编程实战》一1.5 看,大象也会跳舞

    本节书摘来异步社区<MapReduce 2.0源码分析与编程实战>一书中的第1章,第1.5节,作者: 王晓华 责编: 陈冀康,更多章节内容可以访问云栖社区"异步社区"公 ...

  3. 《MapReduce 2.0源码分析与编程实战》一1.6 本章小结

    本节书摘来异步社区<MapReduce 2.0源码分析与编程实战>一书中的第1章,第1.6节,作者: 王晓华 责编: 陈冀康,更多章节内容可以访问云栖社区"异步社区"公 ...

  4. Mapreduce 任务提交源码分析1

    2019独角兽企业重金招聘Python工程师标准>>> 提交过程 一般我们mapreduce任务是通过如下命令进行提交的 $HADOOP_HOME/bin/hadoop jar $M ...

  5. 图解U-Boot:第一阶段源码分析

    U-Boot第一阶段的启动流程.这个阶段主要是初始化硬件设备,为加载U-Boot的第二阶段代码准备RAM空间最后跳转到lib_arm/board.c中start_armboot函数,这是第二阶段的入口 ...

  6. 《MapReduce 2.0源码分析与编程实战》一第2章 入门

    本节书摘来异步社区c书中的第2章,第2.1节,作者: 王晓华 责编: 陈冀康,更多章节内容可以访问云栖社区"异步社区"公众号查看. 第2章 入门 HBase实战 本章涵盖的内容 连 ...

  7. MapReduce中Client提交Job源码分析

    回顾 在进行submit源码分析之前,先来回顾一下WordCount案例(点击查看WordCount案例).仔细回想一下曾经Client都干了点啥?获取对象-->一通set-->job.w ...

  8. MapReduce 源码分析(一)准备阶段

    MapReduce 源码分析 本篇博客根据wordCount代码进行分析底层源码的.以下称它为WC类. package com.henu;import org.apache.hadoop.conf.C ...

  9. hadoop之MapReduce框架TaskTracker端心跳机制分析(源码分析第六篇)

    1.概述 MapReduce框架中的master/slave心跳机制是整个集群运作的基础,是沟通TaskTracker和JobTracker的桥梁.TaskTracker周期性地调用心跳RPC函数,汇 ...

最新文章

  1. R语言switch语句语法、实战:Switch语句用来处理嵌套的if else处理起来比较困难或者麻烦的条件判断问题
  2. 文巾解题 190. 颠倒二进制位
  3. java操作excel文件之系列一:《读取excel文件的内容保存到数据库》
  4. k型热电偶分度表_一张表搞定热电偶与热电阻的问题
  5. BFS HDOJ 1242 Rescue
  6. linux下删除目录及其子目录下某种类型文件
  7. Java高并发编程详解系列-线程通信
  8. API接口设计之RESTful软件架构风格
  9. 自动驾驶攻破的难点在哪,何时能到Level 5?
  10. [Silverlight入门系列]用TransformToVisual和Transform取得元素绝对位置(Location)
  11. python从入门到精通pdf-跟老齐学Python:从入门到精通 完整版PDF[7MB]
  12. python中数据处理的格式,json.csv txt excel
  13. 百万生意难度跟十万一样,十万的也没做成
  14. mysql is fashion_Blog/MySQL.md at master · fashionzzZ/Blog · GitHub
  15. 成功粉碎北信源监控程序vrvedp_m.exe ,vrvrf_c64.exe,svchost.exe,vrvrf_c.exe
  16. Could not find module ‘xxx‘ for target ‘xxx‘; found: i386, x86_64-apple-ios-simula错误解决
  17. jQuery打字效果
  18. 寒冬,送点社区温暖。
  19. 如何快速的把m4a转换成mp3格式
  20. 软件设计模式--软件设计演变过程

热门文章

  1. sql在线练习网站(http://sqlzoo.cn)答案解析(1)
  2. 李想:从放弃高考到如今第三次创业,我的经验和教训
  3. r720 linux 双系统,联想R720拯救者i5 7300黑苹果MacOS10.14.2安装和双系统引导
  4. 三极管放大电路的输出电阻
  5. 中级工程师职称评定条件及申报资料,伴德诚
  6. vue uniapp 微信小程序 搜索下拉框 模糊搜索
  7. 新生报到小程序毕业设计,微信新生报到小程序系统设计与实现,微信小程序毕业设计论文怎么写毕设源码开题报告需求分析怎么做
  8. ZIP文件如何解除解压密码
  9. 3个行业的裂变营销案例,实体店如何低成本拓客,让客户自动裂变
  10. 网络游戏私服行业现状和反私服的措施