简单的MapReduce实践

文章目录

  • 简单的MapReduce实践
    • 操作环境
    • 实现文件合并和去重操作
      • 新建项目
      • 新建Java程序
      • 打包程序
      • 运行程序
    • 实现文件的倒排索引
      • 第一步,Map
      • 第二步,Combiner
      • 第三步,Reduce
      • 配置参数
      • 总体代码
    • 参考文章

操作环境

  • 操作系统:Ubuntu 16.04
  • JDK 版本:1.8
  • Hadoop 版本:Hadoop 3.1.3
  • Java IDE:Eclipse

我的 Hadoop安装目录是“/usr/local/hadoop”,环境变量 HDAOOP_HOME也是这个目录,在博客中看到“/usr/local/hadoop”这样的目录或环境变量 HADOOP_HOME,请记得转换为自己的 Hadoop安装目录。

实现文件合并和去重操作

对于两个输入文件,即文件 A 和文件 B,请编写 MapReduce 程序,对两个文件进行合并, 并剔除其中重复的内容,得到一个新的输出文件 C。

输入文件A 样例

hadoop
spark
flink
storm
s4
pig
hive
hbase
spark
sql

输入文件B 样例

model
view
controller
hadoop
spark

合并去重之后的输出文件C 样例如下

controller
flink
hadoop
hbase
hive
model
pig
s4
spark
sql
storm
view

我们先在 home目录下建立两个文件 A.txt、B.txt,并把样例内容输入进去。在之后的步骤中我们再上传这两个文件到 HDFS中。

新建项目

我们启动 Eclipse,在菜单栏选择 “File”->“New”->“Java Project”,创建一个新的 Java项目。

在“Project name”中输入工程名称,这里我们就叫“MapReduce_Practice”。勾选“Use defaul locationt”,让工程文件保存在我们设置的 Eclipse的工作区里。JRE部分选择“Use a project specific JRE”,使用我们自己安装的 JDK版本。然后点击“next”,进入下一步。

我们需要为项目导入必要的 JAR包,这些 JAR包中包含了可以访问 MapReduce的 Java API。JAR包的位置在“Hadoop安装目录/share/hadoop”目录下。比如我的是在“/usr/local/hadoop/share/hadoop”目录下,下面的操作中请选择到自己配置的 hadoop目录下导入 JAR包。

我们点击标题栏的“Libraries”,点击“Add Externtal JARs”

在新的弹窗中,我们通过选择上面的文件目录,进入“/usr/local/hadoop/share/hadoop”目录,记住是进入自己的Hadoop安装目录

我们需要向 Java工程中添加以下 JAR包:

  • “/usr/local/hadoop/share/hadoop/common”目录下的所有 JAR包,即 hadoop-common-3.1.3.jar、hadoop-common-3.1.3-tests.jar、haoop-nfs-3.1.3.jar和、haoop-kms-3.1.3.jar,不包括 jdiff、lib、sources、webapps四个文件夹。
  • “/usr/local/hadoop/share/hadoop/common/lib”目录下的所有 JAR包
  • “/usr/local/hadoop/share/hadoop/mapreduce”目录下的所有 JAR包。同样地,不包括 jdiff、lib、sources、webapps四个文件夹。
  • “/usr/local/hadoop/share/hadoop/mapreduce/lib”目录下的所有 JAR包

我们分四次操作,把需要的 JAR包全部导入进来(lib目录下的 JAR包太多,我们可以使用 Ctrl+A快捷键进行全选)。所需的 JAR包全部添加完毕以后,我们点击右下角的“Finish”,完成 Java工程的创建。

新建Java程序

我们开始新建一个 Java程序,在 Eclipse界面左侧找到我们刚才创建的项目,点击鼠标右键,选择“New”->“Class”。

在“Package”中填入包名,这里我们填“test”。在“Name”中输入程序的名字,这里我们就叫“Merge”。其他的设置都保持默认,点击“finish”。

界面如下

接下来,我们就开始编写实现文件合并和去重操作的 MapReduce程序了。

这个程序比较简单,就分为 Map和 Reduce两步。

  • 在 Map中,直接记录每一个单词即可,将输入中的 value复制到输出数据的 key上。
  • 在 Reduce中更简单,直接根据 key来划分的,相同的 key放在一起,将输入中的 key复制到输出数据的 key上,写一次 context即可。
package test;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Merge {/*对A,B两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C*///重载map函数public static class Map extends Mapper<Object, Text, Text, Text>{private static Text text = new Text();public void map(Object key, Text value, Context context)throws IOException,InterruptedException{text = value;context.write(text, new Text(""));}}//重载reduce函数public static class Reduce extends Reducer<Text, Text, Text, Text>{public void reduce(Text key, Iterable<Text> values, Context context)throws IOException,InterruptedException{context.write(key, new Text(""));}}public static void main(String[] args) {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://localhost:9000");System.setProperty("HADOOP_USER_NAME", "hadoop");String[] otherArgs = new String[] {"input", "output"};if(otherArgs.length != 2) {System.err.println("Usage: Merge <in> <out>");System.exit(2);}try {Job job = Job.getInstance(conf, "Merge and duplicate removal");job.setJarByClass(Merge.class);job.setMapperClass(Map.class);job.setCombinerClass(Reduce.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true)? 0 : 1);} catch (Exception e) {e.printStackTrace();}}
}

打包程序

下面我们需要把刚才编写的 Java程序打成 JAR包,部署到 Hadoop平台上去运行。

我们在 Hadoop目录下,使用命令行创建一个 myapp目录,用来存放打好的 JAR包。

cd $HADOOP_HOME
mkdir myapp

我们回到 Eclipse,在左侧的“Package Explorer”面板中,找到我们的工程“MapReduce_Practice”,点击鼠标右键,在弹出的选项中选择“Export”。

在弹出的窗口中选择“Java”->“Runnable JAR file”,点击“Next”。

将下列三项配置完成,然后点击“Finish”。

  • “Launch configuration”用于设置 JAR包被部署时运行的主类,我们需要在下拉列表中选择刚才编写的“Merge”。
  • “Export destination”用于设置 JAR包保存的位置,这里我们直接就设置为刚才新建的 myapp目录,即“/usr/local/hadoop/myapp/Merge.jar”。
  • “Library handling”用于设置打包的方式,我们选择“Extract required libraries into generated JAR”。

如果想知道打包方式之间的具体区别,请参考这篇博客 eclipse 导出可运行jar包时三种Library handling的区别,这里我们记住选择“Extract required libraries into generated JAR”就可以了。

点击“Finish”之后,系统会弹出警告,忽略掉即可,直接点击下方的“OK”按钮,将程序进行打包。

打包完成后,会弹出一个警告窗口进行提示,点击“OK”即可。

到这一步,我们已经成功把 Java程序打包为 JAR包并放置在了指定目录,我们可以在终端中进行查看。

cd /usr/local/hadoop/myapp
ll

运行程序

运行 JAR包之前,我们需要做三步准备工作。

第一步,打开 Terminal终端,用命令行启动 Hadoop进程

cd $HADOOP_HOME
./sbin/start-dfs.sh

第二步,删除用户目录下之前存在的 input、output文件夹(如果没有这两个文件夹则跳过这一步)

cd $HADOOP_HOME
./bin/hdfs dfs -rm -r input
./bin/hdfs dfs -rm -r output

第三步,在用户目录下建立 input文件夹,并将之前创建的 A.txt、B.txt文件上传到 input文件夹中

cd $HADOOP_HOME
./bin/hdfs dfs -mkdir input
./bin/hdfs dfs -put ~/A.txt input
./bin/hdfs dfs -put ~/B.txt input

准备工作完成后,我们就可以使用 hadoop jar来运行 JAR包了

cd $HADOOP_HOME
./bin/hadoop jar ./myapp/Merge.jar input output

记住,这里我们不需要建立 output文件夹,hadoop运行过程中会自动建立的。

运行结束后,输入文件的合并和去重结果就写入 output文件夹中了,我们可以输入命令查看结果

./bin/hdfs dfs -cat output/*

可以看到,文件的合并和去重操作顺利完成。由于 Hadoop的设定,如果要再次运行 Merge.jar程序,必须先删除 output文件夹。

实现文件的倒排索引

编写 MapReduce 程序,实现对多个输入文件的内容建立倒排索引,输出单词到文档的映射关系及单词在该文档中的出现次数。

我们建立一个新的 class,名称为 ReverseIndex,包名还是 test。这里我们主要讲解代码,具体的操作细节自行参考上面的文件合并。输入数据仍然采用 input文件夹中的 A.txt、B.txt。

这个程序比文件合并稍微复杂一些。需要记录文件名,还要统计单词在文件中出现的次数。我们分为三步进行,Map-Combiner-Reduce。

第一步,Map

因为需要输出单词到文档的映射关系及单词在该文档中的出现次数,所以我们需要获取文件的名称。在 Map中,通过 FileSplit获取文件的完整路径,切割掉最后一个“/”之前的字符,就得到了文件的名称。然后,用“–”作为连接符,将单词和文件名放在一起作为key值,value值就填为1。

public static class myMap extends Mapper<Object, Text, Text, Text>{public void map(Object key, Text value, Context context) throws IOException,InterruptedException{FileSplit fileSplit = (FileSplit)context.getInputSplit();String filePath = fileSplit.getPath().toString();String fileName = filePath.substring(filePath.lastIndexOf("/")+1);context.write(new Text(value+"--"+fileName), new Text("1"));}
}

第二步,Combiner

在 Combiner中进行预处理,统计单词在文件中出现的次数。将单词和文件名拆分开,word[0]是单词,word[1]是文件名。设置key值为单词,设置value值为文件名加次数。

public static class myCombiner extends Reducer<Text, Text, Text, Text>{public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException{int sum = 0;for(Text v:values) {sum++;}String[] word = key.toString().split("--");context.write(new Text(word[0]), new Text(word[1]+" show: "+sum+" times"));}
}

第三步,Reduce

MapReduce固定的输出格式中会在 value的开头加 tab制表符,我们需要调整一下输出格式。在 Reduce中,给 value加入适当的回车符和制表符。之前经过了 myCombiner类的预处理,可以直接输出了。key值就是单词,value值是文件名加单词次数。

public static class myReduce extends Reducer<Text, Text, Text, Text>{public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException{StringBuilder wordIndex = new StringBuilder();for(Text v:values) {wordIndex.append(v.toString()).append("\n\t");}context.write(new Text(key.toString()+"\n"), new Text(wordIndex.toString()));}
}

配置参数

在主函数中配置必要的参数。指定 Conf配置项,设置 Job的使用类和输出类型,设置文件的输入输出路径,并使用 try-catch语句来处理异常。

这里我们有一个优化,如果文件路径下存在 output文件夹则自动删除,这样避免了每次手动删除 output文件夹的麻烦。

//指定Conf配置项
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
System.setProperty("HDAOOP_USER_NAME", "hadoop")try {//设置JobJob job = Job.getInstance(conf, "ReverseIndex");job.setJarByClass(ReverseIndex.class);job.setMapperClass(myMap.class);job.setCombinerClass(myCombiner.class);job.setReducerClass(myReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//设置文件路径Path input = new Path("/user/hadoop/input");Path output = new Path("/user/hadoop/output");//自动删除output文件夹FileSystem fs = FileSystem.get(conf);if(fs.exists(output)) {fs.delete(output, true);}FileInputFormat.addInputPath(job, input);FileOutputFormat.setOutputPath(job, output);System.exit(job.waitForCompletion(true)?0:1);} catch (Exception e) {e.printStackTrace();System.exit(1);
}

总体代码

package test;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class ReverseIndex {public static class myMap extends Mapper<Object, Text, Text, Text>{public void map(Object key, Text value, Context context) throws IOException,InterruptedException{FileSplit fileSplit = (FileSplit)context.getInputSplit();String filePath = fileSplit.getPath().toString();String fileName = filePath.substring(filePath.lastIndexOf("/")+1);context.write(new Text(value+"--"+fileName), new Text("1"));}}public static class myCombiner extends Reducer<Text, Text, Text, Text>{public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException{int sum = 0;for(Text v:values) {sum++;}String[] word = key.toString().split("--");context.write(new Text(word[0]), new Text(word[1]+" show: "+sum+" times"));}}public static class myReduce extends Reducer<Text, Text, Text, Text>{public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException{StringBuilder wordIndex = new StringBuilder();for(Text v:values) {wordIndex.append(v.toString()).append("\n\t");}context.write(new Text(key.toString()+"\n"), new Text(wordIndex.toString()));}}public static void main(String[] args) {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://localhost:9000");System.setProperty("HDAOOP_USER_NAME", "hadoop")try {Job job = Job.getInstance(conf, "ReverseIndex");job.setJarByClass(ReverseIndex.class);job.setMapperClass(myMap.class);job.setCombinerClass(myCombiner.class);job.setReducerClass(myReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);Path input = new Path("/user/hadoop/input");Path output = new Path("/user/hadoop/output");FileSystem fs = FileSystem.get(conf);if(fs.exists(output)) {fs.delete(output, true);}FileInputFormat.addInputPath(job, input);FileOutputFormat.setOutputPath(job, output);System.exit(job.waitForCompletion(true)?0:1);} catch (Exception e) {e.printStackTrace();System.exit(1);}}
}

最终执行结果如下

参考文章

MapReduce编程实践(Hadoop3.1.3)

简单的HDFS操作

MapReduce编程(二) 文件合并和去重

hadoop < MapReduce 编写程序 实现倒排索引>

简单的MapReduce实践相关推荐

  1. 简单的MapReduce项目,计算文件中单词出现的次数

    简单的MapReduce项目,计算文件中单词出现的次数 计算文件中单词出现的次数,试题如下图 1.创建读取单词的文件tast,内容如下: hadoop core map reduce hiv hba ...

  2. 简单的MongoDB实践

    简单的MongoDB实践 文章目录 简单的MongoDB实践 操作环境 MongoDB认识 查看配置文件 MongoDB与SQL术语 MongoDB外部命令 常用Shell命令 使用MongoDB的S ...

  3. 简单的HBase实践

    简单的HBase实践 文章目录 简单的HBase实践 操作环境 HBase常用shell命令 使用HBase的shell命令 命令行启动HBase并预置数据 1.列出 HBase 中所有的表 2.打印 ...

  4. 如何简单解释 MapReduce算法

    原文地址:如何简单解释 MapReduce 算法 在Hackbright做导师期间,我被要求向技术背景有限的学生解释MapReduce算法,于是我想出了一个有趣的例子,用以阐释它是如何工作的. 例子 ...

  5. linux hadoop eclipse 安装,linux下安装Hadoopeclipse插件以及编写第一个简单的MapReduce程序...

    linux下安装Hadoopeclipse插件以及编写第一个简单的MapReduce程序 Hadoop 安装eclipse这个不难.网上太多的教程,一找一大把.熟悉了之后也不再需要看教程就可以自己安装 ...

  6. [MaxCompute MapReduce实践]通过简单瘦身,解决Dataworks 10M文件限制问题

    用户在DataWorks上执行MapReduce作业的时候,文件大于10M的JAR和资源文件不能上传到Dataworks,导致无法使用调度去定期执行MapReduce作业. 解决方案: 第一步:大于1 ...

  7. 简单解释 MapReduce 算法

    一个有趣的例子 你想数出一摞牌中有多少张黑桃.直观方式是一张一张检查并且数出有多少张是黑桃? MapReduce方法则是: 给在座的所有玩家中分配这摞牌 让每个玩家数自己手中的牌有几张是黑桃,然后把这 ...

  8. python documents in chinese_基于 Python 的简单自然语言处理实践

    基于 Python 的简单自然语言处理 Twenty News Group 语料集处理 20 Newsgroup 数据集包含了约 20000 篇来自于不同的新闻组的文档,最早由 Ken Lang 搜集 ...

  9. 一次简单的 ViewModel 实践

    原文链接:http://bifidy.net/index.php/407 ViewModel 这个概念是基于 MVVM 结构提出的,全称应该叫做 Model-View-ViewModel,从结构上来说 ...

最新文章

  1. Map Reduce Shuffle
  2. 肠道菌群机制研究及国自然课题设计专题会议
  3. 《Unity虚拟现实开发实战》——第1章,第1.8节小结
  4. javascript DOM(08-21)
  5. Linux编译内核出错怎么抓log,内核编译失败
  6. Linux下的LVM
  7. python字符串出栈方法_Python学习之路_day_04(字符串与列表的内置方法)
  8. 地籍cad的lisp程序大集合_超经典CAD_lisp程序集锦、CAD快捷键大全
  9. 线性同余法求随机数python实现_百面机器学习笔记 | 第八章:采样 | 02 均匀分布随机数...
  10. 数据结构课程设计——校园导游
  11. Excel中的Vlookup函数操作实例
  12. 自定义mvc框架复习(crud)
  13. [论文翻译]Deep Learning 翻译及阅读笔记
  14. 工具系列————教育邮箱激活Clion
  15. 传奇服务器最多登录人数设置,传奇服务器中如何设置角色升级经验值数量
  16. manjaro双系统 windows_Linux manjaro与Windows10双系统安装
  17. QT调用dumpcpp.exe让Active、dll生成.h.cpp
  18. TP-Link路由WDS设置心得
  19. 资产设备智能化数字化管理系统在水务环保行业的应用
  20. 机器学习(西瓜书)注解:第12章 计算学习理论

热门文章

  1. Python操作Memcached
  2. Andriod --- JetPack (四):BaseObservable 与 ObservableField 双向绑定
  3. Android --- ListView之高度由 item 的个数决定,wrap_content有效
  4. java事务过大影响系统性能吗_Java编程性能优化-影响性能的因素你都知道吗?
  5. php 7.1.5,Centos 7平滑无缝更新PHP7.1.0到PHP 7.1.5
  6. 暖通空调水系统分类、“管制”和同程异程式的优缺点
  7. 数据中心支持物联网的5种方式
  8. 数据中心的运维管理原则(一)
  9. 智能电源分配PDU应用
  10. python入门之控制结构-循环结构_Python 入门之控制结构 - 循环结构(一)