一、Mapreduce概述

MapReduce是一个编程模型,用以进行大数据量的计算

二、Hadoop MapReduce

(1)MapReduce是什么

Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上,并以一种可靠的,具有容错能力的方式并行地处理上TB级别的海量数据集

Mapreduce的特点:

  1. 软件框架
  2. 并行处理
  3. 可靠且容错
  4. 大规模集群
  5. 海量数据集

(2)MapReduce做什么

MapReduce的思想就是“分而治之”

1)Mapper负责“分”
把复杂的任务分解为若干个“简单的任务”来处理。“简单的任务”包含三层含义:

  1. 数据或计算的规模相对原任务要大大缩小
  2. 就近计算原则,任务会分配到存放着所需数据的节点上进行计算
  3. 这些小任务可以并行计算彼此间几乎没有依赖关系

2)Reducer负责对map阶段的结果进行汇总
至于需要多少个Reducer,可以根据具体问题,
通过在mapred-site.xml配置文件里设置参数mapred.reduce.tasks的值,缺省值为1。

三、MapReduce工作机制

作业执行涉及4个独立的实体

  1. 客户端,用来提交MapReduce作业
  2. JobTracker,用来协调作业的运行
  3. TaskTracker,用来处理作业划分后的任务
  4. HDFS,用来在其它实体间共享作业文件

mapreduce作业工作流程图

Mapreduce作业的4个对象

  1. 客户端(client):编写mapreduce程序,配置作业,提交作业,这就是程序员完成的工作;
  2. JobTracker:初始化作业,分配作业,与TaskTracker通信,协调整个作业的执行;
  3. TaskTracker:保持与JobTracker的通信,在分配的数据片段上执行Map或Reduce任务,TaskTracker和JobTracker的不同有个很重要的方面,就是在执行任务时候TaskTracker可以有n多个,JobTracker则只会有一个(JobTracker只能有一个就和hdfs里namenode一样存在单点故障,我会在后面的mapreduce的相关问题里讲到这个问题的)
  4. Hdfs:保存作业的数据、配置信息等等,最后的结果也是保存在hdfs上面

mapreduce运行步骤1

 首先是客户端要编写好mapreduce程序,配置好mapreduce的作业也就是job,
接下来就是提交job了,提交job是提交到JobTracker上的,这个时候JobTracker就会构建这个job,具体就是分配一个新的job任务的ID值

接下来它会做检查操作,这个检查就是确定输出目录是否存在,如果存在那么job就不能正常运行下去,JobTracker会抛出错误给客户端,接下来还要检查输入目录是否存在,如果不存在同样抛出错误,如果存在JobTracker会根据输入计算输入分片(Input Split),如果分片计算不出来也会抛出错误,至于输入分片我后面会做讲解的,这些都做好了JobTracker就会配置Job需要的资源了。

分配好资源后,JobTracker就会初始化作业,初始化主要做的是将Job放入一个内部的队列,让配置好的作业调度器能调度到这个作业,作业调度器会初始化这个job,初始化就是创建一个正在运行的job对象(封装任务和记录信息),以便JobTracker跟踪job的状态和进程。

mapreduce运行步骤2

初始化完毕后,作业调度器会获取输入分片信息(input split),每个分片创建一个map任务。

接下来就是任务分配了,这个时候tasktracker会运行一个简单的循环机制定期发送心跳给jobtracker,心跳间隔是5秒,程序员可以配置这个时间,心跳就是jobtracker和tasktracker沟通的桥梁,通过心跳,jobtracker可以监控tasktracker是否存活,也可以获取tasktracker处理的状态和问题,同时tasktracker也可以通过心跳里的返回值获取jobtracker给它的操作指令。

任务分配好后就是执行任务了。在任务执行时候jobtracker可以通过心跳机制监控tasktracker的状态和进度,同时也能计算出整个job的状态和进度,而tasktracker也可以本地监控自己的状态和进度。当jobtracker获得了最后一个完成指定任务的tasktracker操作成功的通知时候,jobtracker会把整个job状态置为成功,然后当客户端查询job运行状态时候(注意:这个是异步操作),客户端会查到job完成的通知的。如果job中途失败,mapreduce也会有相应机制处理,一般而言如果不是程序员程序本身有bug,mapreduce错误处理机制都能保证提交的job能正常完成。

四、mapreduce运行机制

在Hadoop中,一个MapReduce作业会把输入的数据集切分为若干独立的数据块,由Map任务以完全并行的方式处理。框架会对Map的输出先进行排序,然后把结果输入给Reduce任务。

作业的输入和输出都会被存储在文件系统中,整个框架负责任务的调度和监控,以及重新执行已经关闭的任务。MapReduce框架和分布式文件系统是运行在一组相同的节点,计算节点和存储节点都是在一起的

(1) MapReduce的输入输出

一个MapReduce作业的输入和输出类型: 会有三组<key,value>键值对类型的存在

(2)Mapreduce作业的处理流程

按照时间顺序包括:
输入分片(input split)
map阶段
shuffle阶段:map shuffle(partition、sort/group、combiner、partition  merge)和reduce shuffle(copy、merge(sort/group))
reduce阶段

1)输入分片(input split)

在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务
输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组,输入分片(input split)往往和hdfs的block(块)关系很密切

假如我们设定hdfs的块的大小是64mb,如果我们输入有三个文件,大小分别是3mb、65mb和127mb,那么mapreduce会把3mb文件分为一个输入分片(input split),65mb则是两个输入分片(input split)而127mb也是两个输入分片(input split)
    即我们如果在map计算前做输入分片调整,例如合并小文件,那么就会有5个map任务将执行,而且每个map执行的数据大小不均,这个也是mapreduce优化计算的一个关键点。

2)map阶段
程序员编写好的map函数了,因此map函数效率相对好控制,而且一般map操作都是本地化操作也就是在数据存储节点上进行;

3)combiner阶段

combiner阶段是程序员可以选择的,combiner其实也是一种reduce操作,因此我们看见WordCount类里是用reduce进行加载的。

Combiner是一个本地化的reduce操作,它是map运算的后续操作,主要是在map计算出中间文件前做一个简单的合并重复key值的操作,例如我们对文件里的单词频率做统计,map计算时候如果碰到一个hadoop的单词就会记录为1,但是这篇文章里hadoop可能会出现n多次,那么map输出文件冗余就会很多,因此在reduce计算前对相同的key做一个合并操作,那么文件会变小,这样就提高了宽带的传输效率,毕竟hadoop计算力宽带资源往往是计算的瓶颈也是最为宝贵的资源,但是combiner操作是有风险的,使用它的原则是combiner的输入不会影响到reduce计算的最终输入,
例如:

如果计算只是求总数,最大值,最小值可以使用combiner,但是做平均值计算使用combiner的话,最终的reduce计算结果就会出错。

4)shuffle阶段

将map的输出作为reduce的输入的过程就是shuffle了。

5)reduce阶段

和map函数一样也是程序员编写的,最终结果是存储在hdfs上的。

五、Mapreduce框架的相关问题

jobtracker的单点故障

jobtracker和hdfs的namenode一样也存在单点故障,单点故障一直是hadoop被人诟病的大问题,为什么hadoop的做的文件系统和mapreduce计算框架都是高容错的,但是最重要的管理节点的故障机制却如此不好,我认为主要是namenode和jobtracker在实际运行中都是在内存操作,而做到内存的容错就比较复杂了,只有当内存数据被持久化后容错才好做,namenode和jobtracker都可以备份自己持久化的文件,但是这个持久化都会有延迟,因此真的出故障,任然不能整体恢复,另外hadoop框架里包含zookeeper框架,zookeeper可以结合jobtracker,用几台机器同时部署jobtracker,保证一台出故障,有一台马上能补充上,不过这种方式也没法恢复正在跑的mapreduce任务。

六、Mapreduce的单词计数实例

public class WordCount {public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {public void map(Object key, Text value, Context context) throws IOException, InterruptedException {}}public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {}}public static void main(String[] args) throws Exception {//… }
}

(1)Map

 public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{  private final static IntWritable one = new IntWritable(1);private Text word = new Text();       public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}

map的方法

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {…}
这里有三个参数,前面两个Object key, Text value就是输入的key和value,第三个参数Context context这是可以记录输入的key和value

例如:context.write(word, one);此外context还会记录map运算的状态。

(2)reduce

 public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}

reduce函数的方法

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {…}

  reduce函数的输入也是一个key/value的形式,
不过它的value是一个迭代器的形式Iterable<IntWritable> values,

也就是说reduce的输入是一个key对应一组的值的value,reduce也有context和map的context作用一致。

(3)main函数

    public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
Configuration conf = new Configuration();

运行mapreduce程序前都要初始化Configuration,该类主要是读取mapreduce系统配置信息,这些信息包括hdfs还有mapreduce,也就是安装hadoop时候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件里的信息,有些童鞋不理解为啥要这么做,这个是没有深入思考mapreduce计算框架造成,我们程序员开发mapreduce时候只是在填空,在map函数和reduce函数里编写实际进行的业务逻辑,其它的工作都是交给mapreduce框架自己操作的,但是至少我们要告诉它怎么操作啊,比如hdfs在哪里啊,mapreduce的jobstracker在哪里啊,而这些信息就在conf包下的配置文件里。

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}

If的语句好理解,就是运行WordCount程序时候一定是两个参数,如果不是就会报错退出。至于第一句里的GenericOptionsParser类,它是用来解释常用hadoop命令,并根据需要为Configuration对象设置相应的值,其实平时开发里我们不太常用它,而是让类实现Tool接口,然后再main函数里使用ToolRunner运行程序,而ToolRunner内部会调用GenericOptionsParser。

    Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);

第一行就是在构建一个job,在mapreduce框架里一个mapreduce任务也叫mapreduce作业也叫做一个mapreduce的job,而具体的map和reduce运算就是task了,这里我们构建一个job,构建时候有两个参数,一个是conf这个就不累述了,一个是这个job的名称。
  
第二行就是装载程序员编写好的计算程序,例如我们的程序类名就是WordCount了。这里我要做下纠正,虽然我们编写mapreduce程序只需要实现map函数和reduce函数,但是实际开发我们要实现三个类,第三个类是为了配置mapreduce如何运行map和reduce函数,准确的说就是构建一个mapreduce能执行的job了,例如WordCount类。
  
第三行和第五行就是装载map函数和reduce函数实现类了,这里多了个第四行,这个是装载Combiner类,这个我后面讲mapreduce运行机制时候会讲述,其实本例去掉第四行也没有关系,但是使用了第四行理论上运行效率会更好。

    job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);

这个是定义输出的key/value的类型,也就是最终存储在hdfs上结果文件的key/value的类型。

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);

第一行就是构建输入的数据文件,
第二行是构建输出的数据文件,
最后一行如果job运行成功了,我们的程序就会正常退出。FileInputFormat和FileOutputFormat可以设置输入输出文件路径,
mapreduce计算时候,输入文件必须存在,要不直Mr任务直接退出。输出一般是一个文件夹,而且该文件夹不能存在。

Hadoop入门(六)Mapreduce相关推荐

  1. Hadoop入门(二)——VMware虚拟网络设置+Windows10的IP地址配置+CentOS7静态IP设置(图文详解步骤2021)

    Hadoop入门(二)--VMware虚拟网络设置+Windows10的IP地址配置+CentOS7静态IP设置(图文详解步骤2021) 之前在上一篇文章中讲述了 CentOS7下载+VM上安装(手动 ...

  2. Hadoop入门(四)——模板虚拟机环境准备(图文详解步骤2021)

    Hadoop入门(四)--模板虚拟机环境准备(图文详解步骤2021) 系列文章传送门 这个系列文章传送门: Hadoop入门(一)--CentOS7下载+VM上安装(手动分区)图文步骤详解(2021) ...

  3. [学习笔记]黑马程序员-Hadoop入门视频教程

    文章目录 参考资料 第一章:大数据导论与Linux基础(p1-p17) 1.1 大数据导论 1.1.1 企业数据分析方向 1.1.2 数据分析基本流程步骤 明确分析的目的和思路 数据收集 数据处理 数 ...

  4. Hadoop入门·环境搭建

    Hadoop入门·环境搭建 1 步骤 硬件环境准备 资源下载 环境部署 2 分布式集群环境部署 2.1 硬件环境准备 本案例中使用三台服务器(仅作为学习案例),分别为Hadoop102,Hadoop1 ...

  5. 大数据时代|核心架构Hadoop入门学习之HDFS,循序渐进求真知

    前言 当今世界,科学技术飞速发展,人们不知不觉的进入了大数据时代.而什么是大数据时代,大数据的发展是什么?这一系列的问题其实很抽象,很难一言半语的概括.但是,在这大数据时代,必须掌握相应的技术作为支撑 ...

  6. Hadoop入门基础教程 Hadoop之单词计数

    单词计数是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版"Hello World",该程序的完整代码可以在Hadoop安装包的src/exampl ...

  7. Hadoop入门基础教程 Hadoop之完全分布式环境搭建

    上一篇我们完成了Hadoop伪分布式环境的搭建,伪分布式模式也叫单节点集群模式, NameNode.SecondaryNameNode.DataNode.JobTracker.TaskTracker所 ...

  8. 一.hadoop入门须知

    目录: 1.hadoop入门须知 2.hadoop环境搭建 3.hadoop mapreduce之WordCount例子 4.idea本地调试hadoop程序 5.hadoop 从mysql中读取数据 ...

  9. 大数据与Hadoop有什么关系?大数据Hadoop入门简介

    学习着数据科学与大数据技术专业(简称大数据)的我们,对于"大数据"这个词是再熟悉不过了,而每当我们越去了解大数据就越发现有个词也会一直被提及那就是--Hadoop 那Hadoop与 ...

  10. hadoop系列四:mapreduce的使用(二)

    转载请在页首明显处注明作者与出处 一:说明 此为大数据系列的一些博文,有空的话会陆续更新,包含大数据的一些内容,如hadoop,spark,storm,机器学习等. 当前使用的hadoop版本为2.6 ...

最新文章

  1. SpringBoot 实战 (八) | 使用 Spring Data JPA 访问 Mysql 数据库
  2. C#中获取指定目录下所有目录的名称、全路径和创建日期
  3. 低水平博士是提升科研生涯的毒药
  4. boost::math模块演示负二项分布使用的简单示例的测试程序
  5. lambda 和 std::function
  6. 【代码笔记】Web-ionic-select
  7. java修改原有txt文件_(转)Java创建txt文件并进行读、写、修改操作
  8. ECCV18 Oral | MVSNet: 非结构化多视点三维重建网络(高精度高效率,代码已开源)...
  9. 预测!显卡容量10年左右会超过500GB。■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■...
  10. python web开发实录
  11. 1106 冒泡排序的语法树
  12. 物联网来了,智能城市离我们还有多远?
  13. Svn插件提交比较慢的解决方法
  14. XcodeGhost事件冷思考:智能时代的达摩克利斯之剑
  15. 2018黑马39期WEB前端视频教程
  16. latex补集怎么打
  17. JSON在线序列化网站
  18. 详解Maven多模块Spring Boot项目从创建到打包
  19. No3 jQuery
  20. AutoJs学习-实现自动刷快手极速版

热门文章

  1. python转字符_python 字符转换
  2. 机器学习之数据预处理——归一化,标准化
  3. 共聚焦图片怎么加标尺_聚焦扶贫政策,打造小康生活
  4. PTA 数据结构与算法题目集(中文)
  5. matlab程序改为m文件名,在MATLAB中,程序文件的扩展名为.m,所以程序文件也称为M文件...
  6. [JavaWeb-MySQL]SQL基本概念,通用语法,分类
  7. [C++11]final关键字的使用
  8. Template Method(模板方法)--类行为型模式
  9. JVM解惑:消失的异常堆栈,log中打印异常堆栈为空
  10. 操作系统——页面置换算法