1、需求场景:
   过滤无意义的单词后再进行文本词频统计。处理流程是:
1)第一个Map使用无意义单词数组过滤输入流;
2)第二个Map将过滤后的单词加上出现一次的标签;
3)最后Reduce输出词频;
MapReduce适合高吞吐高延迟的批处理,对于数据集迭代支持比较弱,唯有这个Chain具备。

2、具体代码如下:

package com.word;import java.io.IOException;
import java.util.HashSet;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class ChainWordCount {//过滤无意义单词的第一个Mappublic static class FilterMapper extends Mapper<Object, Text, Text, Text>{private final static String[] StopWord = {"a","an","the","of","in","to","and","at","as","with"};private HashSet<String> StopWordSet;private Text word = new Text();//setup函数在Map task启动之后立即执行public void setup(Context context) throws IOException,InterruptedException{StopWordSet=new HashSet<String>();for(int i=0;i<StopWord.length;i++){StopWordSet.add(StopWord[i]);}}       //将 输入流中无意义的单词过滤掉public void map(Object key, Text value, Context context)throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {String aword=itr.nextToken();//获取字符if(!StopWordSet.contains(aword)){//不包含无意义单词word.set(aword);context.write(word, new Text(""));}}}}//记录单词标签第二个Mappublic static class TokenizerMapper extends Mapper<Text, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1);   public void map(Text key, Text value, Context context)throws IOException, InterruptedException {context.write(key, one);}}public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: ChainWordCount <in> <out>");System.exit(2);}Job job = new Job(conf, "ChainWordCount");job.setJarByClass(ChainWordCount.class);//第一个map加入作业流JobConf map1Conf=new JobConf(false);ChainMapper.addMapper(job, FilterMapper.class, Object.class, Text.class, Text.class, Text.class, map1Conf);//第二个map加入作业流JobConf map2Conf=new JobConf(false);ChainMapper.addMapper(job, TokenizerMapper.class, Text.class, Text.class, Text.class, IntWritable.class, map2Conf);//将词频统计的Reduce设置成作业流唯一的ReduceJobConf redConf=new JobConf(false);ChainReducer.setReducer(job, IntSumReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, redConf);job.setNumReduceTasks(1);//设置reduce输出文件数FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
/** 统计的输入文件:hadoop fs -put /var/log/boot.log /tmp/fjs/* 结果输出文件:/tmp/fjs/cwcout* 执行命令:hadoop jar /mnt/ChainWordCount.jar /tmp/fjs/boot.log /tmp/fjs/cwcout*/

MapReduce基础开发之十二ChainMapper和ChainReducer使用相关推荐

  1. MapReduce基础开发之十读写ORC File

    1.ORC File Orc是Hive特有的一种列式存储的文件格式,它有着非常高的压缩比和读取效率,因此很快取代了之前的RCFile,成为Hive中非常常用的一种文件格式. 2.编译ORC Jar包 ...

  2. python 基础系列(十二) — python正则

    python 基础系列(十二) - python正则 1. 正则表达式基础 1.1. 简单介绍 正则表达式并不是Python的一部分.正则表达式是用于处理字符串的强大工具,拥有自己独特的语法以及一个独 ...

  3. 陈力:传智播客古代 珍宝币 泡泡龙游戏开发第十二讲:盒子的定位方式

    陈力:传智播客古代 珍宝币 泡泡龙游戏开发第十二讲:盒子的定位方式 摘要:通过前节<第十一讲:浮动>学习了贵阳网站建设中的DIV+CSS中盒子模型和浮动进行介绍.框模型是CSS的基础,本文 ...

  4. IM开发者的零基础通信技术入门(十二):上网卡顿?网络掉线?一文即懂!

    [来源申明]本文引用了微信公众号"鲜枣课堂"的<上网慢?经常掉线?这篇文章告诉你该怎么办!>文章内容.为了更好的内容呈现,即时通讯网在引用和收录时内容有改动,转载时请注 ...

  5. LINUX学习基础篇(十二)痕迹命令

    LINUX学习基础篇(十二)痕迹命令 系统痕迹命令 w命令 who命令 last命令 lastlog命令 lastb命令 系统痕迹命令 系统中有一些重要的痕迹日志文件,如/var/log/wtmp./ ...

  6. 疯狂的大柚柚带你玩转MSP-ESP430G2(基础篇)----(十二)AD转换器

    疯狂的大柚柚带你玩转MSP-ESP430G2(基础篇) (十二)AD转换器 ADC10是MSP430 单片机的片上模数转换器,其转换位数为10比特,该模块内部是一个SAR型的AD内核,可以在片内产生参 ...

  7. MapReduce基础开发之二数据去重和排序

    因Hadoop集群平台网络限制,只能在eclipse里先写好代码再提交jar到集群平台namenode上执行,不能实时调试,所以没有配置eclipse的hadoop开发环境,只是引入了hadoop的l ...

  8. 【Visual C 】游戏开发笔记十二 游戏输入消息处理 一 键盘消息处理

    分享一下我老师大神的人工智能教程!零基础,通俗易懂!http://blog.csdn.net/jiangjunshow 也欢迎大家转载本篇文章.分享知识,造福人民,实现我们中华民族伟大复兴! 本系列文 ...

  9. 【Visual C++】游戏开发笔记十二 游戏输入消息处理(一) 键盘消息处理

    相信大家都熟悉<仙剑奇侠传98柔情版>的人机交互方式,用的仅仅是键盘.在那个物质并不充裕的时代,一台配置并不高的电脑,一款名叫<仙剑奇侠传>的游戏,却能承载一代人对梦想的追逐. ...

最新文章

  1. 机械制图手册_42条机械制图基础常识,带徒师傅必备!
  2. 通用机器学习流程与问题解决架构模板
  3. vue-router使用next()跳转到指定路径时会无限循环
  4. python开发专属表情包_Python开发个人专属表情包网站
  5. springboot 返回字符串带引号_SpringBoot-基础
  6. webpack4.x中使用postcss-loader、autoprefixer给CSS属性自动添加前缀
  7. Flutter之播放视频
  8. svn合并不同树_SVN分支与合并【超详细的图文教程】(转载)
  9. 四翼扑翼机飞控原理解析
  10. 提前实现盈利的快手还有多少成长空间?
  11. 华硕路由器官改/梅林 设置外置USB-JFFS解救NAND
  12. 只要3步,如何用Python发送通知到微信?
  13. 旧文 2012.12.07 关于和赛扶
  14. win7利用pycharm代码连接夜神模拟器运行appium,被杀进程怎么办
  15. SSRNet:用于大规模点云表面重建的深度学习网络(CVPR2020)
  16. H5头像完整制作,可拖拽缩放,可添加装饰图标(装饰图标支持缩放、旋转、拖拽)
  17. 2012 SDCC中国软件开发者大会门票社区团购火热开启!
  18. 29-折半查找法的使用
  19. 最全中文3DMAX四边形散布插件QuadScatter使用教程
  20. 网站渗透测试服务 squid反向代理代码执行漏洞的挖掘

热门文章

  1. 一周一论文(翻译 总结)— [SOCC 14] DaRPC: Data Center RPC 基于RDMA的高性能通信RPC
  2. java poi jar maven_导出maven项目依赖的jar包(图文教程)
  3. Otter 异地机房数据同步的demo实施
  4. 代码迁移之旅(二)- 渐进式迁移方案
  5. Java线程池--ThreadPoolExecutor
  6. SilverlightMVVM模式中的数据校验
  7. HDOJ 1143 Tri Tiling
  8. android 拨打电话 号码判断
  9. 设置ASP.NET中的TextBox控件不缓存上次输入的信息
  10. Android驱动开发之Hello实例(基于高通msm8909)