mapreduce 丢数据_大数据之MapReduce详解
1.什么是Map/Reduce,看下面的各种解释:
(1)MapReduce是hadoop的核心组件之一,hadoop要分布式包括两部分,一是分布式文件系统hdfs,一部是分布式计算框,就是mapreduce,缺一不可,也就是说,可以通过mapreduce很容易在hadoop平台上进行分布式的计算编程。
(2)Mapreduce是一种编程模型,是一种编程方法,抽象理论。
(3)下面是一个关于一个程序员是如何个妻子讲解什么是MapReduce?文章很长请耐心的看。
我问妻子:“你真的想要弄懂什么是MapReduce?” 她很坚定的回答说“是的”。 因此我问道:
我: 你是如何准备洋葱辣椒酱的?(以下并非准确食谱,请勿在家尝试)
妻子: 我会取一个洋葱,把它切碎,然后拌入盐和水,最后放进混合研磨机里研磨。这样就能得到洋葱辣椒酱了。
妻子: 但这和MapReduce有什么关系?
我: 你等一下。让我来编一个完整的情节,这样你肯定可以在15分钟内弄懂MapReduce.
妻子: 好吧。
我:现在,假设你想用薄荷、洋葱、番茄、辣椒、大蒜弄一瓶混合辣椒酱。你会怎么做呢?
妻子: 我会取薄荷叶一撮,洋葱一个,番茄一个,辣椒一根,大蒜一根,切碎后加入适量的盐和水,再放入混合研磨机里研磨,这样你就可以得到一瓶混合辣椒酱了。
我: 没错,让我们把MapReduce的概念应用到食谱上。Map和Reduce其实是两种操作,我来给你详细讲解下。
Map(映射): 把洋葱、番茄、辣椒和大蒜切碎,是各自作用在这些物体上的一个Map操作。所以你给Map一个洋葱,Map就会把洋葱切碎。 同样的,你把辣椒,大蒜和番茄一一地拿给Map,你也会得到各种碎块。 所以,当你在切像洋葱这样的蔬菜时,你执行就是一个Map操作。 Map操作适用于每一种蔬菜,它会相应地生产出一种或多种碎块,在我们的例子中生产的是蔬菜块。在Map操作中可能会出现有个洋葱坏掉了的情况,你只要把坏洋葱丢了就行了。所以,如果出现坏洋葱了,Map操作就会过滤掉坏洋葱而不会生产出任何的坏洋葱块。
Reduce(化简):在这一阶段,你将各种蔬菜碎都放入研磨机里进行研磨,你就可以得到一瓶辣椒酱了。这意味要制成一瓶辣椒酱,你得研磨所有的原料。因此,研磨机通常将map操作的蔬菜碎聚集在了一起。
妻子: 所以,这就是MapReduce?
我: 你可以说是,也可以说不是。 其实这只是MapReduce的一部分,MapReduce的强大在于分布式计算。
妻子: 分布式计算? 那是什么?请给我解释下吧。
我: 没问题。
我: 假设你参加了一个辣椒酱比赛并且你的食谱赢得了最佳辣椒酱奖。得奖之后,辣椒酱食谱大受欢迎,于是你想要开始出售自制品牌的辣椒酱。假设你每天需要生产10000瓶辣椒酱,你会怎么办呢?
妻子: 我会找一个能为我大量提供原料的供应商。
我:是的..就是那样的。那你能否独自完成制作呢?也就是说,独自将原料都切碎? 仅仅一部研磨机又是否能满足需要?而且现在,我们还需要供应不同种类的辣椒酱,像洋葱辣椒酱、青椒辣椒酱、番茄辣椒酱等等。
妻子: 当然不能了,我会雇佣更多的工人来切蔬菜。我还需要更多的研磨机,这样我就可以更快地生产辣椒酱了。
我:没错,所以现在你就不得不分配工作了,你将需要几个人一起切蔬菜。每个人都要处理满满一袋的蔬菜,而每一个人都相当于在执行一个简单的Map操作。每一个人都将不断的从袋子里拿出蔬菜来,并且每次只对一种蔬菜进行处理,也就是将它们切碎,直到袋子空了为止。
这样,当所有的工人都切完以后,工作台(每个人工作的地方)上就有了洋葱块、番茄块、和蒜蓉等等。
妻子:但是我怎么会制造出不同种类的番茄酱呢?
我:现在你会看到MapReduce遗漏的阶段—搅拌阶段。MapReduce将所有输出的蔬菜碎都搅拌在了一起,这些蔬菜碎都是在以key为基础的 map操作下产生的。搅拌将自动完成,你可以假设key是一种原料的名字,就像洋葱一样。 所以全部的洋葱keys都会搅拌在一起,并转移到研磨洋葱的研磨器里。这样,你就能得到洋葱辣椒酱了。同样地,所有的番茄也会被转移到标记着番茄的研磨器里,并制造出番茄辣椒酱。
(4)上面都是从理论上来说明什么是MapReduce,那么咱们在MapReduce产生的过程和代码的角度来理解这个问题。
如果想统计下过去10年计算机论文出现最多的几个单词,看看大家都在研究些什么,那收集好论文后,该怎么办呢?
方法一:
我可以写一个小程序,把所有论文按顺序遍历一遍,统计每一个遇到的单词的出现次数,最后就可以知道哪几个单词最热门了。 这种方法在数据集比较小时,是非常有效的,而且实现最简单,用来解决这个问题很合适。
方法二:
写一个多线程程序,并发遍历论文。
这个问题理论上是可以高度并发的,因为统计一个文件时不会影响统计另一个文件。当我们的机器是多核或者多处理器,方法二肯定比方法一高效。但是写一个多线程程序要比方法一困难多了,我们必须自己同步共享数据,比如要防止两个线程重复统计文件。
方法三:
把作业交给多个计算机去完成。
我们可以使用方法一的程序,部署到N台机器上去,然后把论文集分成N份,一台机器跑一个作业。这个方法跑得足够快,但是部署起来很麻烦,我们要人工把程序copy到别的机器,要人工把论文集分开,最痛苦的是还要把N个运行结果进行整合(当然我们也可以再写一个程序)。
方法四:
让MapReduce来帮帮我们吧!
MapReduce本质上就是方法三,但是如何拆分文件集,如何copy程序,如何整合结果这些都是框架定义好的。我们只要定义好这个任务(用户程序),其它都交给MapReduce。
1.1 MapReduce到底是什么
Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上,并以一种可靠的,具有容错能力的方式并行地处理上TB级别的海量数据集。这个定义里面有着这些关键词,
一是软件框架,二是并行处理,三是可靠且容错,四是大规模集群,五是海量数据集。
1.2 MapReduce做什么
MapReduce擅长处理大数据,它为什么具有这种能力呢?这可由MapReduce的设计思想发觉。MapReduce的思想就是“分而治之”。
(1)Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”来处理。“简单的任务”包含三层含义:
一是数据或计算的规模相对原任务要大大缩小;二是就近计算原则,即任务会分配到存放着所需数据的节点上进行计算;三是这些小任务可以并行计算,彼此间几乎没有依赖关系。
(2)Reducer负责对map阶段的结果进行汇总。至于需要多少个Reducer,用户可以根据具体问题,通过在mapred-site.xml配置文件里设置参数mapred.reduce.tasks的值,缺省值为1。
一个比较形象的语言解释MapReduce:
我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。
现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。
1.3 MapReduce工作机制
实体一:客户端,用来提交MapReduce作业。
实体二:JobTracker,用来协调作业的运行。
实体三:TaskTracker,用来处理作业划分后的任务。
实体四:HDFS,用来在其它实体间共享作业文件。
二、Hadoop中的MapReduce框架
一个MapReduce作业通常会把输入的数据集切分为若干独立的数据块,由Map任务以完全并行的方式去处理它们。
框架会对Map的输出先进行排序,然后把结果输入给Reduce任务。通常作业的输入和输出都会被存储在文件系统中,整个框架负责任务的调度和监控,以及重新执行已经关闭的任务。
通常,MapReduce框架和分布式文件系统是运行在一组相同的节点上,也就是说,计算节点和存储节点通常都是在一起的。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使得整个集群的网络带宽被非常高效地利用。
2.1 MapReduce框架的组成
(1)JobTracker
JobTracker负责调度构成一个作业的所有任务,这些任务分布在不同的TaskTracker上(由上图的JobTracker可以看到2 assign map 和 3 assign reduce)。你可以将其理解为公司的项目经理,项目经理接受项目需求,并划分具体的任务给下面的开发工程师。
(2)TaskTracker
TaskTracker负责执行由JobTracker指派的任务,这里我们就可以将其理解为开发工程师,完成项目经理安排的开发任务即可。
2.2 MapReduce的输入输出
MapReduce框架运转在<key,value>键值对上,也就是说,框架把作业的输入看成是一组<key,value>键值对,同样也产生一组<key,value>键值对作为作业的输出,这两组键值对有可能是不同的。
一个MapReduce作业的输入和输出类型如下图所示:可以看出在整个流程中,会有三组<key,value>键值对类型的存在。
2.3 MapReduce的处理流程
这里以WordCount单词计数为例,介绍map和reduce两个阶段需要进行哪些处理。单词计数主要完成的功能是:统计一系列文本文件中每个单词出现的次数,如图所示:
1)map任务处理
2)reduce任务处理
三、第一个MapReduce程序:WordCount
WordCount单词计数是最简单也是最能体现MapReduce思想的程序之一,该程序完整的代码可以在Hadoop安装包的src/examples目录下找到。
WordCount单词计数主要完成的功能是:统计一系列文本文件中每个单词出现的次数;
3.1 初始化一个words.txt文件并上传HDFS
首先在Linux中通过Vim编辑一个简单的words.txt,其内容很简单如下所示:
Hello Edison Chou Hello Hadoop RPC Hello Wncud Chou Hello Hadoop MapReduce Hello Dick Gu
通过Shell命令将其上传到一个指定目录中,这里指定为:/testdir/input
3.2 自定义Map函数
在Hadoop 中, map 函数位于内置类org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN, KEYOUT, VALUEOUT>中,reduce 函数位于内置类org.apache.hadoop. mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>中。
我们要做的就是覆盖map 函数和reduce 函数,首先我们来覆盖map函数:继承Mapper类并重写map方法
/*** @author Edison Chou* @version 1.0* @param KEYIN* →k1 表示每一行的起始位置(偏移量offset)* @param VALUEIN* →v1 表示每一行的文本内容* @param KEYOUT* →k2 表示每一行中的每个单词* @param VALUEOUT* →v2 表示每一行中的每个单词的出现次数,固定值为1*/public static class MyMapper extendsMapper<LongWritable, Text, Text, LongWritable> {protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, LongWritable>.Context context)throws java.io.IOException, InterruptedException {String[] spilted = value.toString().split(" ");for (String word : spilted) {context.write(new Text(word), new LongWritable(1L));}};}
Mapper 类,有四个泛型,分别是KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面两个KEYIN、VALUEIN 指的是map 函数输入的参数key、value 的类型;后面两个KEYOUT、VALUEOUT 指的是map 函数输出的key、value 的类型;
从代码中可以看出,在Mapper类和Reducer类中都使用了Hadoop自带的基本数据类型,例如String对应Text,long对应LongWritable,int对应IntWritable。这是因为HDFS涉及到序列化的问题,Hadoop的基本数据类型都实现了一个Writable接口,而实现了这个接口的类型都支持序列化。
这里的map函数中通过空格符号来分割文本内容,并对其进行记录;
3.3 自定义Reduce函数
现在我们来覆盖reduce函数:继承Reducer类并重写reduce方法
/*** @author Edison Chou* @version 1.0* @param KEYIN* →k2 表示每一行中的每个单词* @param VALUEIN* →v2 表示每一行中的每个单词的出现次数,固定值为1* @param KEYOUT* →k3 表示每一行中的每个单词* @param VALUEOUT* →v3 表示每一行中的每个单词的出现次数之和*/public static class MyReducer extendsReducer<Text, LongWritable, Text, LongWritable> {protected void reduce(Text key,java.lang.Iterable<LongWritable> values,Reducer<Text, LongWritable, Text, LongWritable>.Context context)throws java.io.IOException, InterruptedException {long count = 0L;for (LongWritable value : values) {count += value.get();}context.write(key, new LongWritable(count));};}
Reducer 类,也有四个泛型,同理,分别指的是reduce 函数输入的key、value类型(这里输入的key、value类型通常和map的输出key、value类型保持一致)和输出的key、value 类型。
这里的reduce函数主要是将传入的<k2,v2>进行最后的合并统计,形成最后的统计结果。
3.4 设置Main函数
(1)设定输入目录,当然也可以作为参数传入
public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt";
(2)设定输出目录(输出目录需要是空目录),当然也可以作为参数传入
public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount";
(3)Main函数的主要代码
public static void main(String[] args) throws Exception {Configuration conf = new Configuration();// 0.0:首先删除输出路径的已有生成文件FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf);Path outPath = new Path(OUTPUT_PATH);if (fs.exists(outPath)) {fs.delete(outPath, true);}Job job = new Job(conf, "WordCount");job.setJarByClass(MyWordCountJob.class);// 1.0:指定输入目录FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));// 1.1:指定对输入数据进行格式化处理的类(可以省略)job.setInputFormatClass(TextInputFormat.class);// 1.2:指定自定义的Mapper类job.setMapperClass(MyMapper.class);// 1.3:指定map输出的<K,V>类型(如果<k3,v3>的类型与<k2,v2>的类型一致则可以省略)job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);// 1.4:分区(可以省略)job.setPartitionerClass(HashPartitioner.class);// 1.5:设置要运行的Reducer的数量(可以省略)job.setNumReduceTasks(1);// 1.6:指定自定义的Reducer类job.setReducerClass(MyReducer.class);// 1.7:指定reduce输出的<K,V>类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);// 1.8:指定输出目录FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));// 1.9:指定对输出数据进行格式化处理的类(可以省略)job.setOutputFormatClass(TextOutputFormat.class);// 2.0:提交作业boolean success = job.waitForCompletion(true);if (success) {System.out.println("Success");System.exit(0);} else {System.out.println("Failed");System.exit(1);}}
在Main函数中,主要做了三件事:一是指定输入、输出目录;二是指定自定义的Mapper类和Reducer类;三是提交作业;匆匆看下来,代码有点多,但有些其实是可以省略的。
.5 运行吧小DEMO
(1)调试查看控制台状态信息
(2)通过Shell命令查看统计结果
四、使用ToolRunner类改写WordCount
Hadoop有个ToolRunner类,它是个好东西,简单好用。无论在《Hadoop权威指南》还是Hadoop项目源码自带的example,都推荐使用ToolRunner。
4.1 最初的写法
下面我们看下src/example目录下WordCount.java文件,它的代码结构是这样的:
public class WordCount {// 略...public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();// 略...Job job = new Job(conf, "word count");// 略...System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
WordCount.java中使用到了GenericOptionsParser这个类,它的作用是将命令行中参数自动设置到变量conf中。举个例子,比如我希望通过命令行设置reduce task数量,就这么写:
bin/hadoop jar MyJob.jar com.xxx.MyJobDriver -Dmapred.reduce.tasks=5
上面这样就可以了,不需要将其硬编码到java代码中,很轻松就可以将参数与代码分离开。
4.2 加入ToolRunner的写法
至此,我们还没有说到ToolRunner,上面的代码我们使用了GenericOptionsParser帮我们解析命令行参数,编写ToolRunner的程序员更懒,它将 GenericOptionsParser调用隐藏到自身run方法,被自动执行了,修改后的代码变成了这样:
public class WordCount extends Configured implements Tool {@Overridepublic int run(String[] arg0) throws Exception {Job job = new Job(getConf(), "word count");// 略...System.exit(job.waitForCompletion(true) ? 0 : 1);return 0;}public static void main(String[] args) throws Exception {int res = ToolRunner.run(new Configuration(), new WordCount(), args);System.exit(res);}
}
看看这段代码上有什么不同:
(1)让WordCount继承Configured并实现Tool接口。
(2)重写Tool接口的run方法,run方法不是static类型,这很好。
(3)在WordCount中我们将通过getConf()获取Configuration对象。
可以看出,通过简单的几步,就可以实现代码与配置隔离、上传文件到DistributeCache等功能。修改MapReduce参数不需要修改java代码、打包、部署,提高工作效率。
4.3 重写WordCount程序
public class MyJob extends Configured implements Tool {public static class MyMapper extendsMapper<LongWritable, Text, Text, LongWritable> {protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, LongWritable>.Context context)throws java.io.IOException, InterruptedException {......}};}public static class MyReducer extendsReducer<Text, LongWritable, Text, LongWritable> {protected void reduce(Text key,java.lang.Iterable<LongWritable> values,Reducer<Text, LongWritable, Text, LongWritable>.Context context)throws java.io.IOException, InterruptedException {......};}// 输入文件路径public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt";// 输出文件路径public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount";@Overridepublic int run(String[] args) throws Exception {// 首先删除输出路径的已有生成文件FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf());Path outPath = new Path(OUTPUT_PATH);if (fs.exists(outPath)) {fs.delete(outPath, true);}Job job = new Job(getConf(), "WordCount");// 设置输入目录FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));// 设置自定义Mapperjob.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);// 设置自定义Reducerjob.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);// 设置输出目录FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));System.exit(job.waitForCompletion(true) ? 0 : 1);return 0;}public static void main(String[] args) {Configuration conf = new Configuration();try {int res = ToolRunner.run(conf, new MyJob(), args);System.exit(res);} catch (Exception e) {e.printStackTrace();}}}
mapreduce 丢数据_大数据之MapReduce详解相关推荐
- 【Dash搭建可视化网站】项目10:疫情数据可视化大屏制作步骤详解
疫情数据可视化大屏制作步骤详解 1 项目效果图 2 项目架构 3 文件介绍和功能完善 3.1 assets文件夹介绍 3.2 app.py和index.py文件完善 3.3 header.py文件完善 ...
- 管道过滤模式 大数据_大数据管道配方
管道过滤模式 大数据 介绍 (Introduction) If you are starting with Big Data it is common to feel overwhelmed by t ...
- 数字经济的核心是对大数据_大数据崛起为数字世界的核心润滑剂
数字经济的核心是对大数据 "Information is the oil of the 21st century, and analytics is the combustion engin ...
- print的describe的展示全部数据_大数据项目中的QA需要迎接新的挑战
根据IDC全球半年度大数据和分析支出指南的最新预测,到2022年全球大数据和业务分析解决方案的收入将达到2600亿美元.在大数据和业务分析解决方案上投资增长最快的行业包括银行(复合年增长率13.3%) ...
- mapreduce编程规范_大数据之MapReduce详解
今天要讲的是MapReduce 目录 今天先总体说下MapReduce的相关知识,后续将会详细说明对应的shuffle.mr与yarn的联系.以及mr的join操作的等知识.以下内容全是个人学习后的见 ...
- hadoop 传感器数据_大数据时代Hadoop的本质,你有过认真了解吗?
除非你过去几年一直隐居,远离这个计算机的世界,否则你不可能没有听过Hadoop,全名Apache Hadoop,是一个在通用低成本的硬件上处理存储和大规模并行计算的一个开源框架,Hadoop本质 ...
- hbase中为何不能向表中插入数据_大数据HBase理论实操面试题
1.HBase的特点是什么? 1)大:一个表可以有数十亿行,上百万列: 2)无模式:每行都有一个可排序的主键和任意多的列,列可以根据需要动态的增加,同一张表中不同的行可以有截然不同的列: 3)面向列: ...
- 横截面数据和面板数据_大数据的小横截面
横截面数据和面板数据 大数据是一个用于数据集的术语,其大小超出了常用软件工具在可容忍的经过时间内捕获,管理和处理数据的能力. 大数据大小是一个不断变化的目标,目前单个数据集中的数据范围从几十TB到许多 ...
- 云消防大数据_大数据在智慧消防中的应用
摘要:大数据是指难以用现有的一般技术手段来管理的大量数据的集合,使用主流软件工具,无法在短时间内实现获取.管理.处理.并使之成为有效的辅助企业运营决策的信息.其强大的决策力能更高效的处理海量的和多样化 ...
最新文章
- 90.前端 :执行方法前提示功能
- 关于datagrid中控件利用js调用后台方法事件的问题
- HTML5的基本入门格式介绍
- 纪念下数据路上遇到的贵人
- 【渝粤教育】国家开放大学2018年春季 8622-22T社会调查研究与方法 参考试题
- 抽象类java启动线程_java 线程复习笔记
- u-boot移植随笔:System.map文件格式
- 图解用户登录验证业务流程(推荐)
- Havel-Hakimi定理 POJ1659
- 路由器交换机[置顶] 路由器和交换机的综合实验⑵
- EPC901安装XP
- 二本天坑,一战成硕,上岸北邮
- Mysql表数据如何增加汇总统计行(GROUP BY WITH ROLLUP函数用法)
- 华为云计算HCIE学习笔记-FusionCompute
- matlab设置脚本,MATLAB脚本和功能
- 将英文转化为二进制黑白码
- 04 爬取周杰伦首页歌单
- 微型计算机升级换代的两种,嵌入式的LED点阵显示屏的研究与实现
- android仿微信图片上传进度,Android开发之模仿微信打开网页的进度条效果(高仿)...
- 2015年系统架构师考试题详解