先看一段代码:

  

package com.abc;import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class TestWorldCount {static class Map extends Mapper<LongWritable, Text, Text, LongWritable> {private final static LongWritable one = new LongWritable(1);private Text word = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();StringTokenizer tokenizer = new StringTokenizer(line);while (tokenizer.hasMoreTokens()) {word.set(tokenizer.nextToken());context.write(word, one);}}}static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {@Overrideprotected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {Iterator<LongWritable> iter= values.iterator();Long sum = 0L;LongWritable res =new LongWritable();while(iter.hasNext()){sum +=iter.next().get();}res.set(sum);context.write(key, res);}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();Job job = new Job(conf, "word count");job.setJarByClass(TestWorldCount.class);job.setMapperClass(Map.class);job.setCombinerClass(Reduce.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);String uri1 = "input0001";String uri3 = "output0001/wc";FileInputFormat.addInputPath(job, new Path(uri1));FileOutputFormat.setOutputPath(job, new Path(uri3));System.exit(job.waitForCompletion(true) ? 0 : 1);}}

这个是最最简单的WorldCount的例子,在设置完一系列参数后,通过Job类来等待程序运行结束。下面是运行的基本流程:

1.Job类初始化JobClient实例,JobClient中生成JobTracker的RPC实例,这样可以保持与JobTracker的通讯,JobTracker的地址和端口等都是外部配置的,通过Configuration对象读取并且传入。

2.JobClient提交作业。

3.JobClient生成作业目录。

4.从本地拷贝MapReduce的作业jar文件(一般是自己写的程序代码jar)。

5.如果DistributedCache中有需要的数据,从DistributedCache中拷贝这部分数据。

6.根据InputFormat实例,实现输入数据的split,在作业目录上生成job.split和job.splitmetainfo文件。

7.将配置文件写入到作业目录的job.xml文件中。

8.JobClient和JobTracker通讯,提交作业。

9.JobTracker将job加入到job队列中。

10.JobTracker的TaskScheduler对job队列进行调度。

11.TaskTracker通过心跳和JobTracker保持联系,JobTracker收到后根据心跳带来的数据,判断是否可以分配给TaskTracker Task,TaskScheduler会对Task进行分配。

12.TaskTracker启动TaskRunner实例,在TaskRunner中启动单独的JVM进行Mapper运行。

13.Map端会从HDFS中读取输入数据,执行之后Map输出数据先是在内存当中,当达到阀值后,split到硬盘上面,在此过程中如果有combiner的话要进行combiner,当然sort是肯定要进行的。

14.Map结束了,Reduce开始运行,从Map端拷贝数据,称为shuffle阶段,之后执行reduce输出结果数据,之后进行commit的操作。

15.TaskTracker在收到commit请求后和JobTracker进行通讯,JobTracker做最后收尾工作。

16.JobTracker返回结果给JobClient,运行结束。

附上一张基本流程图:

Map端机制

对于map端的输入,需要做如下的事情:

1.反射构造InputFormat.

2.反射构造InputSplit.

3.创建RecordReader.

4.反射创建MapperRunner(新api形式下是反射创建org.apache.hadoop.mapreduce.Mapper.Context).

对Map端输出,需要做如下的事情:
1.如果有Partitioner的话,反射构造Partitioner。
2.将key/value/Partitioner数据写入到内存当中。
3.当内存当中的数据达到一定阀值了,需要spill到硬盘上面,在spill前,需要进行排序,如果有combiner的话需要进行combiner。
4.sort的规则是先进行Partitioner的排序,然后再进行key的字典排序,默认的是快速排序。
5.当生成多个spill文件时,需要进行归并,最终归并成一个大文件

关于排序:
1.在内存中进行排序,整个数据的内存不会进行移动,只是再加上一层索引的数据,排序只要调整索引数据就可以了
2.多个spill文件归并到一个大文件时,是一个归并排序的过程,每一个spill文件都是按分区和key排序好的,所以归并完的文件也是按分区和key排序好的。

在进行归并的时候,也不是一次性的把所有的spill文件归并成一个大文件,而是部分spill文件归并成中间文件,然后中间文件和剩下的spill文件再进行一次归并,依次类推,这个的考虑还是因为一次归并文件太多的话IO消耗太大了,如下图:

Reduce端机制

1。ReduceTask有一个线程和TaskTracker联系,之后TaskTracker和JobTracker联系,获取MapTask完成事件
2. ReduceTask会创建和MapTask数目相等的拷贝线程,用于拷贝MapTask的输出数据,MapTask的数据一般都是非本地的
3. 当有新的MapTask完成事件时,拷贝线程就从指定的机器上面拷贝数据,是通过http的形式进行拷贝
4. 当数据拷贝的时候,分两种情况,当数据量小的时候就会写入内存当中,当数据量大的时候就会写入硬盘当中,这些工作分别由两个线程完成
5. 因为所有的数据都来自不同的机器,所以有多个文件,这些文件需要归并成一个文件,在拷贝文件的时候就会进行归并动作
6. 拷贝和归并过程统称为shuffle过程

Reduce端输出需要做如下的事情:
1.构造RecordWriter,这个是根据客户端设置的OutputFormat中getRecordWriter()方法得到 
2.通过OutputFormat和RecordWriter将结果输出到临时文件中
3.Rudece进行commit过程,和TaskTracker进行通信,TaskTracker和JobTracker进行通信,然后JobTracker返回commit的指令,Reduce进行
commit,将临时结果文件重命名成最终的文件
4.commit成功后,kill掉其他的TaskAttempt

Hadoop MapReduce相关推荐

  1. hadoop错误: 找不到或无法加载主类 org.apache.hadoop.mapreduce.v2.app.MRAppMaster

    错误: 找不到或无法加载主类 org.apache.hadoop.mapreduce.v2.app.MRAppMaster 原创hongxiao2016 最后发布于2019-03-30 21:20:5 ...

  2. hadoop调用python算法_使用Python实现Hadoop MapReduce程序

    根据上面两篇文章,下面是我在自己的ubuntu上的运行过程.文字基本采用博文使用Python实现Hadoop MapReduce程序,  打字很浪费时间滴. 在这个实例中,我将会向大家介绍如何使用Py ...

  3. Hadoop MapReduce编程 API入门系列之最短路径(十五)

    不多说,直接上代码. ====================================== = Iteration: 1 = Input path: out/shortestpath/inpu ...

  4. Hadoop mapreduce框架简介

    传统hadoop MapReduce架构(老架构) 从上图中可以清楚的看出原 MapReduce 程序的流程及设计思路: 1.首先用户程序 (JobClient) 提交了一个 job,job 的信息会 ...

  5. mapreduce编程实例python-使用Python语言写Hadoop MapReduce程序

    原标题:使用Python语言写Hadoop MapReduce程序 Python部落(python.freelycode.com)组织翻译,禁止转载,欢迎转发. 在本教程中,我将描述如何使用Pytho ...

  6. hadoop MapReduce实例解析

    1.MapReduce理论简介 1.1 MapReduce编程模型 MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然 ...

  7. 使用Python实现Hadoop MapReduce程序

    根据上面两篇文章,下面是我在自己的ubuntu上的运行过程.文字基本采用博文使用Python实现Hadoop MapReduce程序,  打字很浪费时间滴. 在这个实例中,我将会向大家介绍如何使用Py ...

  8. 《Hadoop MapReduce性能优化》一1.3 Hadoop MapReduce的工作原理

    本节书摘来异步社区<Hadoop MapReduce性能优化>一书中的第1章,第1.3节,作者: [法]Khaled Tannir 译者: 范欢动 责编: 杨海玲,更多章节内容可以访问云栖 ...

  9. Hadoop MapReduce的一些相关代码Code

    MapReduce是一种分布式计算模型(distributed programming model),由Google于2004年左右提出,主要用于搜索领域,解决海量数据的计算问题. MapReduce ...

最新文章

  1. MYsql 查询 查询当前月份的数据
  2. C#学习 小知识_多态的简单实现_2018Oct
  3. 报告!我还有几个阿里同事也去了亚运会
  4. UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-1: ordinal not in range(128)
  5. 10个有趣的Python教程,附视频讲解+练手项目。
  6. PHP中错误处理集合
  7. Ubuntu开启SSHD服务
  8. bp神经网络隐含层神经元个数_CNN,残差网络,BP网络
  9. 〖Android〗屏幕触屏事件录制与回放
  10. kafka 基础知识梳理-kafka是一种高吞吐量的分布式发布订阅消息系统
  11. 支付宝怎么提交html表单提交,uniapp H5 支付表单提交问题解决烦方案
  12. 动态截屏软件jpg格式
  13. 我只能说,Spring Data REST真的很燥辣
  14. doesn‘t work properly without JavaScript enabled. Please enable it to continue 的原因之一
  15. Flask 框架(四)— 表单处理
  16. 2021年中国VR/AR行业市场投融资现状分析:VR/AR技术领域融资实现双增长[图]
  17. 开启「浏览器多线程下载」选项
  18. 线性代数笔记【空间曲面】
  19. n9_Adding Interactivity Animating Plots_BeautifulSoup_Interactive backends_Tkinter_Plot.ly_FFmpeg
  20. 毕业设计之基于springboot的开源商城系统

热门文章

  1. 【Android 逆向】函数拦截 ( GOT 表数据结构分析 | 函数根据 GOT 表进行跳转的流程 )
  2. 【计算理论】计算复杂性 ( 证明 非确定性图灵机 与 确定性图灵机 的时间复杂度 之间的指数关系 )
  3. 【集合论】容斥原理 ( 复杂示例 )
  4. 【Android 多媒体开发】 MediaPlayer 状态机 接口 方法 解析
  5. 在QT中结构体快速从二进制文件中读取数据
  6. 题解 P4753 【River Jumping】
  7. Swoole练习 Web
  8. php htmlentities函数的问题
  9. NSArray和NSMutableArray
  10. liunx(3)-内核模块编写与系统调用