MapReduce经典案例实战

实验实现过程

重要知识点:

  1. MapReduce是一种分布式并行编程模型,是Hadoop核心子项目之一,如果已经安装了Hadoop,就不需要另外安装MapReduce。
  2. 主要的理论知识点:
    (1)倒排索引
    倒排索引是文档检索系统中最常用的数据结构,被广泛应用于全文搜索引擎。倒排索引主要用来存储某个单词(或词组)在一组文档中的存储位置的映射,提供了可以根据内容来查找文档的方式,而不是根据文档来确定内容,因此称为倒排索引(Inverted Index)。带有倒排索引的文件我们称为倒排索引文件,简称倒排文件(Inverted File)。
    (2)数据去重
    数据去重主要是为了掌握利用并行化思想来对数据进行有意义的筛选,数据去重指去除重复数据的操作。在大数据开发中,统计大数据集上的多种数据指标,这些复杂的任务数据都会涉及数据去重。
    (3)TopN排序
    TopN分析法是指从研究对象中按照某一个指标进行倒序或正序排列,取其中所需的N个数据,并对这N个数据进行重点分析的方法。
  3. MapReduce的程序可以用Eclipse编译运行或使用命令行编译打包运行,本实验使用Eclipse编译运行MapReduce程序。

实验准备工作:
1、掌握Eclipse以及插件的安装与配置
2、熟悉在Eclipse中操作分布式系统HDFS 中的文件的方法
以上内容前面文章已做:
大数据实验环境准备与配置(1/4)
https://blog.csdn.net/weixin_43640161/article/details/108614907
大数据实验环境准备与配置(2/4)
https://blog.csdn.net/weixin_43640161/article/details/108619802
大数据实验环境准备与配置(3/4)
https://blog.csdn.net/weixin_43640161/article/details/108691921
大数据实验环境准备与配置(第四部分完结)
https://blog.csdn.net/weixin_43640161/article/details/108697510
Hadoop环境配置与测试
https://blog.csdn.net/weixin_43640161/article/details/108745864
分布式文件系统HDFS Shell命令和API编程
https://blog.csdn.net/weixin_43640161/article/details/108879567
MapReduce编程实践
https://blog.csdn.net/weixin_43640161/article/details/108947291

实验一:倒排索引案例实现–请根据理论课案例分析步骤实现具体的倒排索引

假设有file1.txt,file2.txt,file3.txt。它们的内容分别如下:
file1.txt文件内容:MapReduce is simple
file2.txt文件内容:MapReduce is powerful is simple
file3.txt文件内容:Hello MapReduce bye MapReduce
具体步骤如下:
一、在 Eclipse 中创建 MapReduce 项目
点击 File 菜单,选择 New -> Project…:选择 Map/Reduce Project,点击 Next。

填写 Project name 为MapReduceDemo即可,点击 Finish 就创建好了项目。

此时在左侧的 Project Explorer 就能看到刚才建立的项目了。接着右键点击刚创建的 MapReduce项目src,选择 New -> packet,在 Package 处填写 cn.com.sise.mapreduce.invertedindex;

二、Map阶段实现
在cn.com.sise.mapreduce.invertindex包下新建自定义类Mapper类InvertedIndexMapper,该类继承Mapper类,如下图所示。



该类的作用:将文本中的单词按照空格进行切割,并以冒号拼接,“单词:文档名称
”作为key,单词次数作为value,都以文本方式传输至Combine阶段。
参考代码如下:

package cn.com.sise.mapreduce.invertedindex;import java.io.IOException;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text,Text> {private static Text keyInfo = new Text();//存储单词和URL组合private static final Text valueInfo = new Text("1");//存储词频,初始化为1@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException{ String line = value.toString();String[] fields = StringUtils.split(line, " ");//得到字段数组FileSplit fileSplit = (FileSplit) context.getInputSplit();//得到这行数据所在的文件切片String fileName = fileSplit.getPath().getName();//根据文件切片得到文件名for (String field : fields){//key值由单词和URL组成,如"MapReduce:file1"keyInfo.set(field +":" + fileName);context.write(keyInfo, valueInfo);}}
}

三、Combine阶段实现
根据Map阶段的输出结果形式,在cn.com.sise.mapreduce.invertindex包下,自定义实现Combine阶段的类InvertedIndexCombiner,该类继承Reducer类,对每个文档的单词进行词频统计,如下图所示。

该类作用:对Map阶段的单词次数聚合处理,并重新设置key值为单词,value值由文档名称和词频组成。
参考代码:

package cn.com.sise.mapreduce.invertedindex;import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{ private static Text info = new Text();//输入: <MapReduce:file3 {1,1..>//输出: <MapReduce file3:2>@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException{int sum=0;//统计词频for (Text value : values) {sum += Integer.parseInt(value.toString());}int splitIndex = key.toString().indexOf(":");//重新设置value值由URL和词频组成info.set(key.toString().substring(splitIndex +1) +":" + sum);//重新设置key值为单词key.set(key.toString().substring(0, splitIndex));context.write(key, info);}
}

四、Reduce阶段实现
根据Combine阶段的输出结果形式,同样在cn.com.sise.mapreduce.invertindex包下,自定义实现Reducer类InvertedIndexReducer,该类继承Reducer,同上步,略。
该类作用:接收Combine阶段输出的数据,按照最终案例倒排索引文件需求的样式,将单词作为key,多个文档名称和词频连接作为value,输出到目标目录。


参考代码:

package cn.com.sise.mapreduce.invertedindex;import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> { private static Text result = new Text();//输入: <MapReduce file3:2>//输出: <MapReduce file1:1;file2:1;file3:2;>@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {//生成文档列表String fileList = new String();for (Text value : values) {fileList += value.toString() +";";}result.set(fileList);context.write(key, result);}}

五、Runner程序主类实现
在同一个包下编写MapReduce程序运行主类InvertedIndexDriver。
该类作用:设置MapReduce工作任务的相关参数,本来采用集群运行模式。因此,需要设置远程HDFS系统源文件目录(hdfs://localhost:9000/user/hadoop/inputdata)以及结果输出目录(hdfs://localhost:9000/user/hadoop/outputdata),设置完毕,运行主程序即可。

参考代码:

package cn.com.sise.mapreduce.invertedindex;import java.io.IOException;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input. FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.service.api.records.Configuration;public class InvertedIndexDriver {
/*** @param args* @throws InterruptedException * @throws IOException* @throws ClassNotFoundException*/
public static void main(String[] args) throws ClassNotFoundException,IOException,InterruptedException{ Configuration conf = new Configuration();Job job = Job.getInstance();job.setJarByClass(InvertedIndexDriver.class);job.setMapperClass(InvertedIndexMapper.class);job.setCombinerClass(InvertedIndexCombiner.class);    job.setReducerClass(InvertedIndexReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/user/hadoop/inputdata"));//指定处理完成之后的结果所保存的位置FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/outputdata"));//向yarn集群提交这个jobboolean res =job.waitForCompletion(true);System.exit(res? 0: 1);}
}

六、数据准备
(1)在linux本地目录下(/home/hadoop/workspace/MapReduceDemo)新建源文件file1.txt,file2.txt,file3.txt。
file1.txt文件内容:MapReduce is simple
file2.txt文件内容:MapReduce is powerful is simple
file3.txt文件内容:Hello MapReduce bye MapReduce
参考命令代码如下:
pwd
cd workspace/MapReduceDemo/
sudo vim file1.txt
sudo vim file2.txt
sudo vim file3.txt
ls

(2)启动hadoop(start-all.sh),在hdfs分布式文件系统目录(/user/hadoop/)中新建inputdata目录,然后将步骤(1)中的3个文件上传到该目录下,参考命令代码如下:
启动Hadoop:


操作完成后记得鼠标右键空白处刷新


把刚刚创建的三个txt文件上传到inputdata目录

七、运行并查看结果(2种方式)

(1)shell命令
终端命令:
hdfs dfs -ls /user/hadoop/outputdata
hdfs dfs -cat /user/hadoop/outputdata/part-r-00000

(2)Eclipse IDE

实验二:数据去重案例实现

假设有数据文件file4.txt和file5.txt,内容分别如下,编程实现2个文件内容去重:
file4.txt内容:
2020-9-1 a
2020-9-2 b
2020-9-3 c
2020-9-4 d
2020-9-5 a
2020-9-6 b
2020-9-7 c
2020-9-3 c

file5.txt内容:
2020-9-1 b
2020-9-2 a
2020-9-3 b
2020-9-4 d
2020-9-5 a
2020-9-6 c
2020-9-7 d
2020-9-3 c
1.在MapReduceDemo项目下新建包cn.com.sise.mapreduce.dedup,思路请参考实验一。

2.Map阶段实现
在包cn.com.sise.mapreduce.dedup下自定义类DedupMapper,该类继承Mapper。
该类作用:读取数据集文件将TextInputFormat默认组件解析的类似<0,2020-9-1 a>键值对修改为<2020-9-1 a,null>

参考代码:

package cn.com.sise.mapreduce.dedup;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class DedupMapper extends Mapper<LongWritable, Text, Text, NullWritable> { private static Text field = new Text();//<0,2020-9-3 c><11,2020-9-4 d>@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ field = value;context.write(field, NullWritable.get());}// <2020-9-3 c.null> <2020-9-4 d.null>
}

3.Reduce阶段实现
在相同包下自定义DedupReducer类,该类继承Reducer。
该类作用:仅接受Map阶段传递过来的数据,根据Shuffle工作原理,键值key相同的数据就会被合并,因此输出的数据就不会出现重复数据了。

参考代码:

package cn.com.sise.mapreduce.dedup;import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class DedupReducer extends Reducer<Text, NullWritable, Text, NullWritable> {//<2020-9-3 c.null> <2020-9-4 d.null><2020-9-4 d.null>@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {context.write(key, NullWritable.get());        }
}
  1. Runner主程序实现
    编写MapReduce程序运行主类DedupRunner。
    该类作用:设置MapReduce工作任务的相关参数,本案例采用集群运行模式。因此,需要设置远程HDFS系统源文件目录(hdfs://localhost:9000/user/hadoop/inputdata1)以及结果输出目录(hdfs://localhost:9000/user/hadoop/outputdata1),设置完毕,运行主程序即可。

参考代码:

package cn.com.sise.mapreduce.dedup;import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.service.api.records.Configuration;public class DedupRunner {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf = new Configuration();Job job =Job.getInstance();job.setJarByClass(DedupRunner.class);job.setMapperClass(DedupMapper.class);job.setReducerClass(DedupReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/user/hadoop/inputdata1"));//指定处理完成之后的结果所保存的位置FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/outputdata1"));job.waitForCompletion(true);}
}
  1. 数据准备(思路参考实验一)
    在linux本地目录下(/home/hadoop/workspace/MapReduceDemo)新建源文件file4.txt,file5.txt(内容已给),然后在hdfs分布式文件系统目录(/user/hadoop)中新建inputdata1目录,将本地2个文件上传到该目录下,参考命令代码如下:
    终端命令:
    cd workspace/MapReduceDemo/
    sudo vim file4.txt
    sudo vim file5.txt



6.运行并查看结果(2种方式,请参考实验一)

鼠标右键刷新hadoop

(1)shell命令
终端命令:
hdfs dfs -ls /user/hadoop/outputdata1
hdfs dfs -cat /user/hadoop/outputdata/part-r-00000

(2)Eclipse IDE

实验三:TopN排序案例实现

假设有数据文件num.txt,文件内容如下:
10 3 8 7 6 5 1 2 9 4
11 12 17 14 15 20
19 16 18 13
要求使用MapReduce技术提取上述文本中最大的5个数据,并将最终结果汇总到一个文件中。

1.在MapReduceDemo项目下新建包cn.com.sise.mapreduce.topn,思路请参考实验一和二。

2.Map阶段实现
自定义TopNMapper类,继承Mapper。
该类作用:先将文件中的每行数据进行切割提取,并把数据保存到TreeMap中,判断TreeMap是否大于5,如果大于5就需要移除最小的数据。由于数据是逐行读取,如果这时就向外写数据,那么TreeMap就保存了每一行的最大5个数,因此需要在cleanup()方法中编写context.write()方法,这样就保证了当前MapTask中TreeMap保存了当前文件最大的5条数据后,再输出到Reduce阶段。

参考代码:

package cn.com.sise.mapreduce.topn;import java.util.TreeMap;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class TopNMapper extends Mapper<LongWritable, Text, NullWritable, IntWritable> {private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>();//<0,10 387651294//<xx.11 12 17 14 15 20>@Overridepublic void map(LongWritable key, Text value, Context context) { String line = value.toString();String[] nums = line.split("");for (String num : nums) {repToRecordMap.put(Integer.parseInt(num), " ");if (repToRecordMap.size() >5) {repToRecordMap.remove(repToRecordMap.firstKey());}}}@Overrideprotected void cleanup(Context context) {for (Integer i : repToRecordMap.keySet()){try {context.write(NullWritable.get(), new IntWritable(i));}catch (Exception e){e.printStackTrace();}}
}
}
  1. Reduce阶段
    在同一个包下自定义TopNReducer类,该类继承Reducer。
    该类作用:首先TreeMap自定义排序规则,当需求取最大值时,只需要在compare()方法中返回正数即可满足倒序排序,reduce()方法依然要满足时刻判断TreeMap中存放数据是前5个数,并最终遍历输出最大的5个数。

参考代码:

package cn.com.sise.mapreduce.topn;import java.io.IOException;
import java.util.Comparator;
import java.util.TreeMap;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;public class TopNReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> { private TreeMap<Integer,String>repToRecordMap = new TreeMap<Integer,String>(new Comparator<Integer>(){//返回一个基本类型的整型,谁大谁排后面.//返回负数表示:01小于02//返回0表示:表示: 01和02相等//返回正数表示: 01大于02。public int compare(Integer a, Integer b) {return b-a;}}); public void reduce(NullWritable key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {for (IntWritable value : values) {repToRecordMap.put(value.get(),"");if (repToRecordMap.size()> 5) {repToRecordMap.remove(repToRecordMap.firstKey());}}for (Integer i : repToRecordMap.keySet()) {context.write(NullWritable.get(), new IntWritable(i));}}}
  1. Runner程序主类实现
    在linux本地目录下(/home/hadoop/workspace/MapReduceDemo)新建源文件num.txt(内容已给),然后在hdfs分布式文件系统目录(/user/hadoop)中新建inputdata2目录,将本地2个文件上传到该目录下,参考命令代码如下:

参考代码:

package cn.com.sise.mapreduce.topn;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.service.api.records.Configuration;public class TopNRunner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance();job.setJarByClass(TopNRunner.class);job.setMapperClass(TopNMapper.class);job.setReducerClass(TopNReducer.class);job.setNumReduceTasks(1);job.setMapOutputKeyClass(NullWritable.class);// map阶段的输出的keyjob.setMapOutputValueClass(IntWritable.class);// map阶段的输出的valuejob.setOutputKeyClass(NullWritable.class);// reduce阶段的输出的keyjob.setOutputValueClass(IntWritable.class);// reduce阶段的输出的value.FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/user/hadoop/inputdata2"));FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/outputdata2"));boolean res =job.waitForCompletion(true);System.exit(res? 0: 1);}
}





  1. 运行并查看结果

** 到了这一步,本次实验就完成了,你今天学会了吗?**

MapReduce经典案例实战相关推荐

  1. MapReduce经典案例总结

    MapReduce经典案例总结 首先得搭好hadoop环境,windows搭好单机环境 1.根据手机号统计上网流量,统计上行流量.下行流量和总流量 数据如下,文件 flow.log,上传到hadoop ...

  2. OpenCV4经典案例实战教程 笔记

    OpenCV4经典案例实战教程 笔记 这几天在看OpenCV4经典的案例实战教程,这里记录一下学习的过程. 案例一 刀片1的缺陷检测 这里的目的是检测出有缺陷的刀片,如下图. 先总结一下思路,这里首先 ...

  3. Python机器学习经典案例实战-韦玮-专题视频课程

    Python机器学习经典案例实战-3710人已学习 课程介绍         Python在机器学习领域应用是非常广泛的,比如,我们可以使用机器学习进行验证码识别,使用机器学习实现计算机视觉项目,或者 ...

  4. MapReduce经典案例—TopN

    目录 一.问题介绍 (一)案例分析 1. TopN分析法介绍 2. 案例需求及分析 (二)案例实现 1. Map阶段实现 2. Reduce阶段实现 3. Driver程序主类实现 4. 效果测试 二 ...

  5. MapReduce经典案例——统计单词数

    资源文件file.txt hello hadoop hello word this is my first hadoop program 分析:一个文档中每行的单词通过空格分割后获取,经过map阶段可 ...

  6. 《实战网络营销 网络推广经典案例战术解》扫描版[PDF]

    电驴资源 下面是用户共享的文件列表,安装电驴后,您可以点击这些文件名进行下载 一┳═┻︻▃内容简介处附有网盘快速下载通道▃︻┻═┳一 [实战网络营销.网络推广经典案例战术解].扫描版.张书乐.pdf详 ...

  7. 【项目实战合集】计算机视觉毕业设计项目怎么选,超30个经典案例供你选择...

    每年到了搞毕业设计的时候,很多学生朋友都很头疼,指导老师给不了好题目,自己也没有什么好的想法,怕选的太容易了过不了,怕选的太难了做不出!今年我们在计算机视觉方向出了[超过30个基于Pytorch框架] ...

  8. 数仓工具—Hive实战之full join 经典案例(13)

    full join 经典案例 full join 增量数据同步更新 我们知道我们的数仓数据很大一部分是来自业务数据库的,那么这个时候我们数据同步的方式有两种一种是增量同步一种是全量同步,那么这个时候我 ...

  9. 数仓工具—Hive实战之自关联经典案例(11)

    自关联经典案例 其实说到关联我相信很多人都知道,自关联其实是关联的一种,其实我们最常见的是两张不同的表之间的关联,但其实工作中我们能看到的其实还有一种关联,那就是一张表与它自己关联. 下面我们就通过几 ...

  10. SQL优化实战经典案例分析

    前言 大家好,我是黎杜,今天和大家聊聊SQL优化的场景. SQL调优这块呢,大厂面试必问的.最近金九银十嘛,所以整理了SQL的调优思路,并且附几个经典案例分析. 1.慢SQL优化思路. 慢查询日志记录 ...

最新文章

  1. jsr303jsp页面怎么显示错误信息_springmvc使用JSR-303进行数据校验实例
  2. Qt下Undefined reference to 'vtable for xxx'
  3. css字体阴影_css3如何添加文字阴影效果?text-shadow设置文字阴影效果
  4. Android常用代码混淆模板
  5. Linux 之 光标消失隐藏术
  6. HDU4612+Tarjan缩点+BFS求树的直径
  7. 【bzoj4940】这是我自己的发明
  8. Java使用数组学习心得
  9. 登入ftp:500 OOPS: vsf_sysutil_bind, maximum number of attempts to find a listening port exceeded
  10. python ev3图形化编程软件下载_【stm32图形化编程软件免费版下载】stm32图形化编程软件 v1.0 最新免费版-开心电玩...
  11. 芯片设计流程最全讲解
  12. 北京师范大学网络教育计算机考试题,2020年北京师范大学网络教育入学考试高起专语文模拟题及答案(机考古文阅读)...
  13. GNSS 周跳探测方法 之 TurboEdit
  14. win10如何使用低版本的IE浏览器?
  15. 4K工业级高清4进1出DP自动USB KVM多电脑切换器(MT-PK401)
  16. python 把数字日期转换成中文日期
  17. 文件上传(图片上传) 大小限制的配置 及注意点
  18. 2000-2021年 美国国际收支平衡表 德国国际收支平衡表 中国国际收支平衡表……
  19. (一)路径规划算法---Astar与C++可视化在RVIZ的二维栅格地图
  20. IE提示”不支持当前兼容性设置,请在运行此Web页之前禁用兼容性视图”

热门文章

  1. aso优化时高权重的积分墙关键词_怎样做好积分墙关键词的优化
  2. PHP宝塔IDC分销系统,PHP宝塔IDC分销系统,宝塔面板开虚拟主机程序–Bty1.0
  3. 为什么计算机和网络设备都需要接地,一个实例全面讲解机房如何做防雷接地?...
  4. 川土微电子 | CA-IS3050U隔离式CAN收发器
  5. springboot中设置pageSize的默认值
  6. 若依ruoyi框架整合magic-api增删改查Demo
  7. 戴尔计算机进入安全模式后黑屏,电脑进入省电模式黑屏怎么恢复
  8. Excel--indirect函数(间接引用)
  9. 80286微处理器和80386的优缺点
  10. java将文件移动到另一个目录