MapReduce编程模型

目录

  • MapReduce编程模型
    • 1、MapReduce编程模型简介
    • 2、什么是MapReduce
    • 3、MapReduce的优缺点
    • 4、MapReduce程序设计方法
    • 5、WordCount编程实例
      • 执行程序
    • 6、Hadoop MapReduce架构
    • 7、MapReduce实战开发

1、MapReduce编程模型简介

MapReduce源于Google的一篇论文,它充分借鉴了分而治之的思想,将z个数据处理过程拆分成主要的Map(映射)与 Reduce(化简)两步。这样,即使用户不懂分布式计算框架的内部运行机制,只要能用Map和Reduce的思想描述清楚要处理的问题,即编写map()和reduce()函数,就能轻松地使问题的计算实现分布式,并在 Hadoop上运行。

2、什么是MapReduce

  • MapReduce采用的是“分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各子节点共同完成,接着通过整合各子节点的中间结果,得到最终的结果。简言之,MapReduce就是“分散任务,汇总结果”。

  • 从MapReduce自身的命名特点可以看出,MapReduce有两个阶段组成:Map和Reduce.用户只需编写map()和reduce()两个函数,即可完成简单的分布式程序的设计。

  • map()负责将任务分散成多个子任务,map()函数以key/value(键/值)对作为输入,产生另外一系列key/value对作为中间输出写人本地磁盘。MapReduce 框架会自动将这些中间数据按照key值进行聚集,且key值相同的数据被统十交给reduce()函数处理。

  • reduce()负责把分解后多个任务的处理结果汇总起来。reduce()函数以 key及过应的 value列表作为输人,将key值相同的value值合并后,产生另一组key/value对作为最终结
    果输出写人到HDFS。

  • 至于在并行编程中的其他种种复杂问题,如分布式存储、工作调度、负载均衡、容错处
    理、网络通信等.均由MapReduce框架负责处理,可以不用程序员操心。值得注意的是,用
    MapReduce来处理的数据集(或任务)必须具备这样的特点:待处理的数据集可以分解成许 多小的数据集,且每个小数据集都可以完全并行地进行处理。

  • MapReduce可以用除Java以外的其他语言来实现

python Ruby C++ … Java语言为主

3、MapReduce的优缺点

  1. MapReduce在处理数据方面的优点
  • (1)开发简单。得益于MapReduce的编程模型,用户可以不用考虑进程间通信、套接字编程,无须非常高深的技巧,只需要实现一些非常简单的逻辑,其他的交由MapReduce计算框架去完成,大大简化了分布式程序的编写难度。
  • (2)可扩展性强。同 HDFS一样,当集群资源不能满足计算需求时,可以通过增加节点的方式达到线性扩展集群的目的。
  • (3)容错性强。对于节点故障导致的作业失败, MapReduce计算框架会自动将作业分配到正常的节点重新执行,直到任务完成。这些对干用户来说都具添明的国归松排躲
  1. MapReduce在处理数据方面的缺点
  • (1)不适合事务/单一请求处理:MapReduce绝对是一个离线批处理系统,对于批处理数据应用得很好。MapReduce(不论是Google的,还是Hadoop的)是用于处理不适合传统数据库的海量数据的理想技术,但它又不适合事务/单一请求处理。
  • (2)性能问题。想想N个map 实例产生M个输出文件,每个最后由不同的reduce实例处理,这些文件写到运行map实例机器的本地硬盘。如果N是1000,M 是 500,map阶段产生500000个本地文件。当reduce阶段开始,500个reduce实例每个需要读入1000个文件,并用类似FTP协议把它要的输入文件从map实例运行的节点上pull取过来。假如同时有数量级为100的reduce实例运行,那么2个或2个以上的reduce实例同时访问同一个map节点来获取输人文件是不可避免的,即导致大量的硬盘查找,使有效的硬盘运转速度至少降低20%。
  • (3)不适合一般Web应用。大部分Web应用,只是对数据进行简单的访问,每次请求处理所耗费的资源其实非常小,它的问题是高并发,所以要采用负载均衡技术来分担负载。只有在特殊情况下才可能用MR,如创建索引,进行数据分析等。

4、MapReduce程序设计方法

用户在程序编写完成后,按照一定的规则制定程序的输人和输出目录,并提交到Hadoop集群中。Hadoop将输人数据切分成若干个输入分片(split),保证并行计算的效率;在map阶段,数据经过map)函数的处理,转化为key/value 的形式输出;在shuffle阶段基于排序的方法会将key相同的数据聚集在一起;最后的reduce阶段会调用reduce()函数将key值相同的数据合并,并将结果输出。

由图可知:
输入文本(可以不只一个),按行提取文本文档的单词,形成行<k1,v1>键值对(具体形式很多,例如<行数,字符偏移>);
通过Spliting将<k1,v1>细化为单词键值对<k2,v2>,Map分发到各个节点,同时将<k2,v2>归结为list(<k2,v2>);
在进行计算统计前,先用Shuffing将相同主键k2归结在一起形成<k2, list(v2)>
Reduce阶段直接对<k2, list(v2)> 进行合计得到list(<k3,v3>)并将结果返回主节点。

5、WordCount编程实例

  • 测试说明
    以下是测试样例:

    测试输入样例数据集:文本文档test1.txt, test2.txt。

    文档test1.txt中的内容为:

tale as old as time
true as it can be
beauty and the beast

文档test2.txt中的内容为:

ever just the same
ever as before
beauty and the beast

词频统计代码:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {/** MapReduceBase类:实现Mapper和Reducer接口的基类    * Mapper接口: * WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类要实现此接口。* LongWritable表示每一行起始偏移量  * 第一个Text是用来接受文件中的内容,  * 第二个Text是用来输出给Reduce类的key,  * IntWritable是用来输出给Reduce类的value    */ public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{/**LongWritable,IntWritable,Text是Hadoop中实现的用于封装Java数据类型的类,这些类实现了WritableComparable接口,*都能够被串行化,便于在分布式环境中进行数据交换,可以视为long,int,String数据类型的替代。*/private final static IntWritable one = new IntWritable(1);private Text word = new Text();//Text实现了BinaryComparable类,可以作为key值/** Mapper接口中的map方法: * void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)* 映射一个单个的输入<K1,V1>对到一个中间输出<K2,V2>对 * 中间输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。    * OutputCollector接口:收集Mapper和Reducer输出的<K,V>对。 * OutputCollector接口的collect(k, v)方法:增加一个(k,v)对到output * Reporter 用于报告整个应用的运行进度*/  public void map(Object key, Text value, Context context) throws IOException, InterruptedException {/** 原始数据(以test1.txt为例):*tale as old as timetrue as it can bebeauty and the beastmap阶段,数据如下形式作为map的输入值:key为偏移量<0  tale as old as time><21 world java hello><39 you me too> *//*** 解析(Spliting)后以得到键值对<K2,V2>(仅以test1.txt为例)* 格式如下:前者是键值,后者数字是值* tale 1* as 1* old 1* as 1* time 1* true 1* as 1* it 1* can 1* be 1* beauty 1* and 1* the 1* beast 1* 这些键值对作为map的输出数据*/StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();/** reduce过程是对输入键值对洗牌(Shuffing)形成<K2,list(V2)>格式数据(仅以test1.txt为例):* (tablie [1])* (as [1,1,1])* (old [1])* (time [1])* (true [1])* (it [1])* (can [1])* (be [1])* (beauty [1])* (and [1])* (the [1])* (beast [1])* 作为reduce的输入* */public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException {//****请补全reduce对<k2, list(v2)> 进行合计得到list(<k3,v3>)过程****///*********begin*********/int count=0;  for(IntWritable itr:values)  {  count+=itr.get();  /*循环统计*/  }  /*统计完成后,将结果输出.....*/  result.set(count);context.write(key,result);/*********end**********/}}public static void main(String[] args) throws Exception {/*** JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作 * 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等 */  Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();/** 需要配置输入和输出的HDFS的文件路径参数* 可以使用"Usage: wordcount <in> <out>"实现程序运行时动态指定输入输出*/if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}Job job = new Job(conf, "word count");//Job(Configuration conf,String jobName)设置job名称job.setJarByClass(WordCount.class);//为job设置Mapper类 job.setCombinerClass(IntSumReducer.class);//为job设置Combiner类job.setMapperClass(TokenizerMapper.class); //为设置map函数job.setReducerClass(IntSumReducer.class);  //为设置reduce函数job.setOutputKeyClass(Text.class);//为设置输出的key类型job.setOutputValueClass(IntWritable.class);//为设置输出的value类型FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//为map-reduce任务设置InputFormat实现类,设置输入路径FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类,设置输出路径System.exit(job.waitForCompletion(true) ? 0 : 1);//如果程序正常运行成功,程序就会正常退出}
}

执行程序

在程序编写完成后,还需要将代码打包成jar文件并运行。用eclipse自带的打包工具
导出为wordcount.jar文件,上传至集群任一节点,并在该节点执行命令:

hadoop jar wordcount.jar hellohadoop .WordCount /user/test/input /user/test/output

Hellohadoop为程序的包名,WordCount为程序的类名,/user/test/input为HDFS存放文本的目录(如果指定一个目录为MapReduce输人路径,则MapReduce会将该路径下的所有文件作为输入文件;如果指定一个文件,则MapReduce只会将该文件作为输人),/user/test/output为作业的输出路径(该路径在作业运行前必须不存在)。

命令执行后,屏幕会打出有关任务进度的日志:

22/04/22 00:53:23 INFO mapreduce .Job: Running job: job 1442977437282 0466
22/04/22 00:53:34 INFO mapreduce .Job: Job job_ 1442977437282 0466 running in uber mode : false
22/04/22 00:53:34 INFO mapreduce.Job: map 0%reduce 0%22/04/22 00:53:46 INFO mapreduce.Job: map 100%reduce 0%
22/04/22 00:53:54 INFO mapreduce .Job: map 100%reduce 100%
...

当任务完成后,屏幕会输出相应的日志:

22/04/22 00:53:55 INFO mapreduce.Job: Job job_1442977437282_0466
completed successfully
22/04/22 00:53:55 INFO mapreduce.Job: Counters: 49
...

接下来我们查看输出目录:

hadoop fs- 1s/user/test/output

会看到:

-rw-r--r-- 2 zkpk supergroup 0 2022-04-22 00:53/user/test/ output/_ SUCCESS
一rw-r--r-- 2 zkpk supergroup 3721 2022-04-22 00:53/user/test/output3/part- r-0000

其中,SUCCESS文件是一个空的标志文件,它标志该作业成功完成,而结果则存放在part-00000。执行命令:

hadoop fs- cat/user/test/output/part- r- 0000

得到结果:

and 2
as 4
be 1
beast 2
beauty 2
before 1
can 1
ever 2
it 1
just 1
old 1
same 1
tale 1
the 3
time 1
true 1

6、Hadoop MapReduce架构

以下是hadoop1.0的架构,2.0参考(略)

  • 1、Client 用户编写的MapReduce程序通过Client提交到JobTracker端;同时,用户可通过Client提供的一些接口查看作业的运行状态。在Hadoop内部用“作业”(Job)表示MapReduce程序。一个MapReduce程序可对应若干个作业,而每个作业会被分解成若干个Map/Reduce任务(Task)。

  • 2、JobTracker JobTracke负责资源监控和作业调度。JobTracker 监控所有TaskTracker 与job的健康状况,一旦发现失败,就将相应的任务转移到其他节点;同时,JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。在Hadoop 中,任务调度器是一个可插拔的模块,用户可以根据自己的需要设计相应的调度器。

  • 3、TaskTracker TaskTracker 会周期性地通过Heartbeat 将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)。TaskTracker 使用槽“slot”等量划分本节点上的资源量。“slot”代表计算资源(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop 调度器的作用就是将各个TaskTracker 上的空闲slot 分配给Task 使用。slot 分为Map slot 和Reduce slot 两种,分别供MapTask 和Reduce Task 使用。TaskTracker 通过slot 数目(可配置参数)限定Task 的并发度。

  • 4、Task Task 分为Map Task 和Reduce Task 两种,均由TaskTracker 启动。HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。但需要注意的是,split 的多少决定了Map Task 的数目,因为每个split 只会交给一个Map Task 处理。

Map Task 执行过程如下图 所示。由该图可知,Map Task 先将对应的split 迭代解析成一个个key/value 对,依次调用用户自定义的map() 函数进行处理,最终将临时结果存放到本地磁盘上,其中临时数据被分成若干个partition,每个partition 将被一个Reduce Task 处理。

Reduce Task 执行过程如下图所示。该过程分为三个阶段:
①从远程节点上读取MapTask 中间结果(称为“Shuffle 阶段”);
②按照key 对key/value 对进行排序(称为“Sort 阶段”);
③依次读取<key, value list>,调用用户自定义的reduce() 函数处理,并将最终结果存到HDFS 上(称为“Reduce 阶段”)。

7、MapReduce实战开发

文件内容合并去重

输入文件file1的样例如下:
20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x

输入文件file2的样例如下:
20150101 y
20150102 y
20150103 x
20150104 z
20150105 y

案例代码如下:

import java.io.IOException;import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class Merge {/*** @param args* 对A,B两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C*///在这重载map函数,直接将输入中的value复制到输出数据的key上 注意在map方法中要抛出异常:throws IOException,InterruptedExceptionpublic static class Map  extends Mapper<Object, Text, Text, Text>{/********** Begin **********/public void map(Object key, Text value, Context content) throws IOException, InterruptedException {  Text text1 = new Text();Text text2 = new Text();StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {text1.set(itr.nextToken());text2.set(itr.nextToken());content.write(text1, text2);}}  /********** End **********/} //在这重载reduce函数,直接将输入中的key复制到输出数据的key上  注意在reduce方法上要抛出异常:throws IOException,InterruptedExceptionpublic static class  Reduce extends Reducer<Text, Text, Text, Text> {/********** Begin **********/public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {Set<String> set = new TreeSet<String>();for(Text tex : values){set.add(tex.toString());}for(String tex : set){context.write(key, new Text(tex));}}  /********** End **********/}public static void main(String[] args) throws Exception{// TODO Auto-generated method stubConfiguration conf = new Configuration();conf.set("fs.default.name","hdfs://localhost:9000");Job job = Job.getInstance(conf,"Merge and duplicate removal");job.setJarByClass(Merge.class);job.setMapperClass(Map.class);job.setCombinerClass(Reduce.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);String inputPath = "/user/tmp/input/";  //在这里设置输入路径String outputPath = "/user/tmp/output/";  //在这里设置输出路径FileInputFormat.addInputPath(job, new Path(inputPath));FileOutputFormat.setOutputPath(job, new Path(outputPath));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

根据输入文件file1和file2合并得到的输出文件file3的样例如下:

20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x

Hadoop MapReduce编程模型相关推荐

  1. [转]Hadoop集群_WordCount运行详解--MapReduce编程模型

    Hadoop集群_WordCount运行详解--MapReduce编程模型 下面这篇文章写得非常好,有利于初学mapreduce的入门 http://www.nosqldb.cn/1369099810 ...

  2. mapreduce 编程模型

    MapReduce是在总结大量应用的共同特点的基础上抽象出来的分布式计算框架,它适用的应用场景往往具有一个共同的特点:任务可被分解成相互独立的子问题.基于该特点,MapReduce编程模型给出了其分布 ...

  3. MapReduce编程模型

    1.MapReduce简介 MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上. 一个完整的 ...

  4. 大数据快速入门(05):MapReduce 编程模型赏析

    一.Hadoop 诞生的传奇故事 (上图是 Doug Cutting,hadoop 之父) 1985年,Cutting 毕业于美国斯坦福大学. Cutting 的第一份工作是在 Xerox 做实习生, ...

  5. Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

    不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce. ...

  6. Hadoop MapReduce编程 API入门系列之查找相同字母组成的字谜(三)

    找出相同单词的所有单词.现在,是拿取部分数据集(如下)来完成本项目. 项目需求 一本英文书籍包含成千上万个单词或者短语,现在我们需要在大量的单词中,找出相同字母组成的所有anagrams(字谜). 思 ...

  7. 一个wordcount程序轻松玩转MapReduce编程模型

    可以毫不夸张的说,几乎开发中绝大部分的MR程序都是基于wordcount编程模型而来,或者说用wordcount变化而来(改变的主要是业务方面的逻辑).所以,熟练掌握wordcount编程模型,是掌握 ...

  8. Mapreduce编程模型(一)

    1.1Mapreduce模型简介 Mapreduce是一种可用于数据处理的编程模型,Hadoop上可以运行各种语言版本的Mapreduce程序.Mapreduce程序是并行运行的,采用了分治的思想.编 ...

  9. Hadoop MapReduce编程 API入门系列之最短路径(十五)

    不多说,直接上代码. ====================================== = Iteration: 1 = Input path: out/shortestpath/inpu ...

最新文章

  1. HDU 2018 母牛的故事
  2. 【Android 逆向】Android 权限 ( 查看内存信息 | 查看 CPU 信息 | 查看电池信息 | 查看账户信息 | 查看 Activity 信息 | 查看 Package 信息 )
  3. php 数据中心,数据层 · Thinkphp 独立数据中心使用手册 · 看云
  4. sqlyog要先安装mysql_MySQL和SQLyog的配置-安装及遇到的问题
  5. 二十万字C/C++、嵌入式软开面试题全集宝典一
  6. babel css3新特性_2018年面试前端总结
  7. Spring注解驱动开发
  8. Abp Vnext Pro 的 Vue 实现版本
  9. 限制MySQL Binlog的传输速率
  10. amigo幸运字符什么意思_转载 | 史上最全 python 字符串操作指南
  11. std::move()源码分析
  12. 金九银十正确打开方式!快手三面面试真题
  13. step文件查看软件_3D PDF文件转换为step
  14. sql注入之——sqlmap教程
  15. 比Excel还简单的SQL语句查询
  16. 带有en的单词有哪些_英语前缀大全en:开头是EN的单词有哪些
  17. 【Linux】服务器部署:阿里云服务器购买配置与报价参考
  18. NT Server无盘站配置技术详解
  19. FS4059B是5V输入升压充电8.4V1.5A的升压恒流充电IC高效串联充电
  20. java调用帆软cpt文件_报表中心FineReport中java如何直接调用报表打印

热门文章

  1. 递归算法的时间复杂度和空间复杂度
  2. Group by 后面直接加数字
  3. visio方向键不能移动对象
  4. 计算机一级考试office题库2016,2016年计算机一级MS Office考试试题及答案
  5. 区分处理器的字,字长和汇编中的字,双字,四字
  6. Cobalt Strike:使用已知的私钥解密流量 -Part 2
  7. Uncaught TypeError: Cannot read property ‘install‘ of undefined
  8. Impinj Spedway 在使用 Telnet 时需要用 Putty 进行连接,并且连接类型需要选 SSH
  9. 用WordCloud词云+LDA主题模型,带你读一读《芳华》(python实现)
  10. iOS开发网络篇 一一 文件上传