新手入门MapReduce实现改进版WordCount词频统计

一、实验任务要求

本实验是为了实现改进版的词频统计WordCount。要求根据所给的英文名著数据集和停用词表,统计英文名著数据集中词频,过滤掉停用词,将统计结果按照词频降序排序,词频相同的词按照字典序排序。

英文名著数据集和停用词表再此处(提取码:ceij)

二、实验工具和环境配置说明

电脑安装了Vmware软件,搭建Centos7系统环境,配置了Hadoop2.7.4单机伪分布环境并安装eclipse编程软件,使用Java语言完成实验任务。

三、步骤

首先我们对实验任务进行划分,将实验任务划分为两个子任务:词频统计和排序,针对两个子任务job分别进行设计。

1.词频统计

(1)首先,我们在这个任务阶段需要英文名著数据集和停用词表,统计是针对英文名著数据集,但要过滤掉停用词,所以map阶段要判断接收到的词是不是停用词,不是则输出。这就要保证每个map都能使用停用词表,所以,就要使用map中的Setup把这个停用词表变成一个全局变量;

在main函数里的第一个job中加入缓存文件停用词表stopwords.txt。

在Mapper中的设置一个空列表li,在Mapper中的setup阶段读缓存文件,将其中的每一个停用词加入到列表中。
(2)获得了全局变量停用词列表后,接下来就要获得文章的每个词,判断词如果不是停用词,输出<key=词,value=词频1>。

将英文名著按行读取,使用空格等标点符号分割成一个个词,将词化为小写,如果词不在停用词列表中,输出。
(3)Reducer收到词,将key即相同的词merge合并累计词频。

2.排序

(1)首先,我们排序是对任务一的词频文件进行排序,由任务一job1,我们已经获得了统计好的词频文件存储在test文件夹中,将词频统计job1的输出路径作为排序job2的输入路径;

(2)然而我们排序不是简单的按词的字典序排序,也不是按照词频排序,而是将二者综合在一起排序,所以我们要自定义排序方式,由于MapReduce是根据key来排序,所以我们要自定义key的数据类型从而自定义排序方式。

我自定义了一个数据类型myclass,其中两个变量x代表词频,y代表词。

排序方式定义为按词频降序,词频相同按照词的字典序排序。

(3)Mapper中读取词和词频,构造输出<key,value>为类型为myclass,IntWritable。

(4)Reducer把输出的key再改回词。

实验结果与展示:



所有代码如下:


import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.file.FileStore;
import java.nio.file.PathMatcher;
import java.nio.file.WatchService;
import java.nio.file.attribute.UserPrincipalLookupService;
import java.nio.file.spi.FileSystemProvider;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text word = new Text();static List<String> li = new ArrayList<String>();@Overrideprotected void setup(Context context)throws IOException, InterruptedException {//获取缓存文件路径的数组Path [] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());System.out.println(paths);BufferedReader sb = new BufferedReader(new FileReader(paths[0].toUri().getPath()));//读取BufferedReader里面的数据String tmp = null;while ( (tmp = sb.readLine()) != null) {String ss []= tmp.split(" ");for (String s : ss) {li.add(s);}}//关闭sb对象sb.close();System.out.println("+++++++"+li);}public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString()," \'\",!.?(){}-:;@#&*/[]`s<>_");while (itr.hasMoreTokens()) {String tmpword = itr.nextToken();if(!li.contains(tmpword.toLowerCase())){word.set(tmpword.toLowerCase());context.write(word, one);}}}}public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static class myclass implements WritableComparable<myclass> {public int x;public String y;public int getX() {return x;}public String getY() {return y;}public void readFields(DataInput in) throws IOException {x = in.readInt();y = in.readUTF();}public void write(DataOutput out) throws IOException {out.writeInt(x);out.writeUTF(y);}public int compareTo(myclass p) {if (this.x > p.x) {return -1;} else if (this.x < p.x) {return 1;} else {if (this.getY().compareTo(p.getY()) < 0) {return -1;} else if (this.getY().compareTo(p.getY()) > 0) {return 1;} else {return 0;}}}}public static class Mysorter extends IntWritable.Comparator {public int compare(WritableComparable a, WritableComparable b) {return -super.compare(a, b);}public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return -super.compare(b1, s1, l1, b2, s2, l2);}}public static class TokenizerMapper1 extends Mapper<Object, Text, myclass, IntWritable>{//private Text keyInfo = new Text();  private IntWritable valueInfo = new IntWritable();@Overridepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer( value.toString());String[] word = value.toString().split("\\s+");myclass keyInfo = new myclass();keyInfo.x=Integer.parseInt(word[word.length-1]);keyInfo.y=word[word.length-2];//keyInfo.set( word[0]);          valueInfo.set(Integer.parseInt(word[word.length-1]));context.write(keyInfo, valueInfo);   }
}public static class IntSumReducer1 extends Reducer<myclass,IntWritable,Text,IntWritable> {private IntWritable valueInfo = new IntWritable();private Text keyInfo = new Text(); public void reduce(myclass key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {keyInfo.set(key.y);valueInfo.set(key.x);context.write(keyInfo,valueInfo);}}public static void main(String[] args) throws Exception {//任务一Configuration conf = new Configuration();//FileSystem hdfs= FileSystem.get(conf);//conf.set("stop", "hdfs://localhost:9000/input/stopwords.txt");//DistributedCache.addCacheFile(new URI("hdfs://localhost:9000/discache"), conf);//String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();String[] otherArgs = new String[]{"hdfs://localhost:9000/input/index/data","hdfs://localhost:9000/output/test"};if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}Job job1 = Job.getInstance(conf, "word count");job1.setJarByClass(WordCount.class);//设置分布式缓存文件job1.addCacheFile(new URI("hdfs://localhost:9000/input/stopwords/stopwords.txt"));job1.setMapperClass(TokenizerMapper.class);job1.setCombinerClass(IntSumReducer.class);job1.setReducerClass(IntSumReducer.class);job1.setOutputKeyClass(Text.class);job1.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job1, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job1,new Path(otherArgs[otherArgs.length - 1]));job1.waitForCompletion(true);//任务2Configuration conf1 = new Configuration();Job job2 = Job.getInstance(conf1, "sort");job2.setMapperClass(TokenizerMapper1.class);job2.setReducerClass(IntSumReducer1.class);job2.setMapOutputKeyClass(myclass.class);job2.setMapOutputValueClass(IntWritable.class);job2.setOutputKeyClass(Text.class);job2.setOutputValueClass(IntWritable.class);//job1.setSortComparatorClass(Mysorter.class);FileInputFormat.addInputPath(job2, new Path(otherArgs[otherArgs.length - 1]));FileOutputFormat.setOutputPath(job2,new Path("hdfs://localhost:9000/output/sort"));System.exit(job2.waitForCompletion(true) ? 0 : 1);}
}

MapReduce实现改进版WordCount词频统计相关推荐

  1. MapReduce编写实现wordcount词频统计

    p>首先编写WordCountDriver: package com.jym.hadoop.mr.demo; import java.io.IOException; import org.apa ...

  2. Hadoop | MapReduce之 WordCount词频统计

    WordCount词频统计 词频统计 WordCountMap.java // Map类,继承于org.apache.hadoop.mapreduce.Mapper; public class Wor ...

  3. WordCount词频统计

    @WordCount词频统计详解(乱序版) WordCount主要分三部分: WordCountMain.WordCountMapper.WordcountReducer WordCountMain: ...

  4. 从零开始 之 使用 MapReduce 对文件进行词频统计

    文章目录 1. Linux的选择 2. 在 Ubuntu 中安装 Hadoop 2.1 创建Hadoop用户 2.2 下载并安装 Hadoop3.3.1 3. 安装 Java 环境 3.1 下载并进行 ...

  5. python写wordcount_Python开发Spark应用之Wordcount词频统计

    一个早上只做了一点微小的工作,很忏愧.但是发现Spark这玩意还是蛮有意思的.下面给大家介绍一下如何用python跑一遍Wordcount的词频统计的示例程序. 在operator模块中导入add类f ...

  6. 12.MapReduce第2部分(WordCount词频统计、自然连接)

    一.程序要求 二.WordCount设计思路 假设三个分片,分别输入到三个不同的Map任务中去 行号:key 内容:value 三.MapReduce的具体应用之自然连接 举例子:

  7. MapReduce实现词频统计

    问题描述:现在有n个文本文件,使用MapReduce的方法实现词频统计. 附上统计词频的关键代码,首先是一个通用的MapReduce模块: class MapReduce:__doc__ = '''提 ...

  8. Hadoop的环境配置——搭建一个主机hadoop102,两个从机hadoop103,hadoop104,并运行分布式词频统计

    本文是跟着B站上的视频实现的,链接如下: https://www.bilibili.com/video/BV1Qp4y1n7EN?p=18 Hadoop运行环境搭建 重来3遍是正常的,这篇针对的是怎么 ...

  9. Hadoop实例之利用MapReduce实现Wordcount单词统计 (附源代码)

    大致思路是将hdfs上的文本作为输入,MapReduce通过InputFormat会将文本进行切片处理,并将每行的首字母相对于文本文件的首地址的偏移量作为输入键值对的key,文本内容作为输入键值对的v ...

最新文章

  1. 搜索引擎早期重要论文推荐系列【7】《Searching the Web》
  2. Mathematica 画图操作中的一些小惊喜
  3. 手把手教你D2C,走向前端智能化
  4. runloop - 面试题
  5. Anaconda下改变python的版本
  6. warning: C4819: 该文件包含不能在当前代码页(936)中表示的字符。请将该文件保存为 Unicode 格式以防止数据丢失
  7. [转]XCode调试 设置全局断点并快速定位问题代码所在行
  8. 用R命令看一下各个寄存器的设置情况
  9. 2007中国优秀手机客户端软件和无线互联网
  10. Android PackageInstaller 静默安装的实现(附源码)
  11. Chorme 模拟分辨率设置
  12. Baklib知识库-企业知识库管理平台
  13. 判断一个整数是否是7的倍数
  14. 描写计算机教室的词语,关于描写教室的词语
  15. DDR3学习总结(二)
  16. 利用CNN进行句子分类的敏感性分析
  17. CUDA学习(十一):原子操作实现向量内积
  18. Argument of type ‘‘ is not assignable to parameter of type ‘never‘.
  19. 对于编码器与解码器的理解
  20. 解决git时出现error: src refspec master does not match any问题

热门文章

  1. 《计算复杂性:现代方法》——导读
  2. Kafka生产者——消息发送流程,同步、异步发送API
  3. 项目二 linux的管理与维护
  4. IBM DS4300磁盘损坏事故的数据恢复解决思路
  5. esxi6.7虚拟机装服务器上不认u盘,vmware ESXi 6.7 识别不到USB 网卡(示例代码)
  6. MSSQL SERVER中易混淆的数据类型
  7. win10+python3.7下安装scrapy
  8. 浅谈如何正确选择及使用电流互感器
  9. 一文说透 Jarvis+ 去中心化社群经济
  10. 一步一步成为一个Linux开发人员