027_编写MapReduce的模板类Mapper、Reducer和Driver
模板类编写好后写MapReduce程序,的模板类编写好以后只需要改参数就行了,代码如下:
1 package org.dragon.hadoop.mr.module; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.conf.Configured; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 18 import org.apache.hadoop.util.Tool; 19 import org.apache.hadoop.util.ToolRunner; 20 21 /** 22 * 23 * ########################################### 24 * ############ MapReduce 模板类 ########## 25 * ########################################### 26 * 27 * @author ZhuXY 28 * @time 2016-3-13 下午10:21:06 29 * 30 */ 31 public class ModuleMapReduce extends Configured implements Tool { 32 33 /** 34 * Mapper Class 35 */ 36 public static class ModuleMapper extends 37 Mapper<LongWritable, Text, LongWritable, Text> { 38 39 @Override 40 protected void setup(Context context) throws IOException, 41 InterruptedException { 42 super.setup(context); 43 } 44 45 @Override 46 protected void map(LongWritable key, Text value, Context context) 47 throws IOException, InterruptedException { 48 super.map(key, value, context); 49 } 50 51 @Override 52 protected void cleanup(Context context) throws IOException, 53 InterruptedException { 54 super.cleanup(context); 55 } 56 } 57 58 /** 59 * Reducer Class 60 */ 61 public static class ModuleReducer extends 62 Reducer<LongWritable, Text, LongWritable, Text> { 63 64 @Override 65 protected void setup(Context context) throws IOException, 66 InterruptedException { 67 // TODO Auto-generated method stub 68 super.setup(context); 69 } 70 71 @Override 72 protected void reduce(LongWritable key, Iterable<Text> values, 73 Context context) throws IOException, InterruptedException { 74 // TODO Auto-generated method stub 75 super.reduce(key, values, context); 76 } 77 78 @Override 79 protected void cleanup(Context context) throws IOException, 80 InterruptedException { 81 // TODO Auto-generated method stub 82 super.cleanup(context); 83 } 84 85 } 86 87 /** 88 * Driver Class 89 */ 90 91 // 专门抽取一个方法出来用于设置 92 public Job parseInputAndOutput(Tool tool,Configuration conf,String[] args) throws IOException 93 { 94 if (args.length!=2) { 95 System.err.printf("Usage:%s [generic options] <input> <output>\n", tool.getClass().getSimpleName()); 96 ToolRunner.printGenericCommandUsage(System.err); 97 return null; 98 } 99 100 //创建job,并设置配置信息和job名称 101 Job job=new Job(conf, ModuleMapReduce.class.getSimpleName()); 102 103 //设置job的运行类 104 // step 3:set job 105 // 1) set run jar class 106 job.setJarByClass(tool.getClass()); 107 108 // 14) job output path 109 FileOutputFormat.setOutputPath(job, new Path(args[1])); 110 111 return job; 112 } 113 114 @Override 115 public int run(String[] args) throws Exception { 116 117 // step 1:get conf 118 Configuration conf = new Configuration(); 119 120 // step 2:create job 121 Job job = parseInputAndOutput(this, conf, args); 122 123 124 // 2) set input format 125 job.setInputFormatClass(TextInputFormat.class); // 可省 126 127 // 3) set input path 128 FileInputFormat.addInputPath(job, new Path(args[0])); 129 130 // 4) set mapper class 131 job.setMapperClass(ModuleMapper.class); // 可省 132 133 // 5)set map input key/value class 134 job.setMapOutputKeyClass(LongWritable.class); // 可省 135 job.setMapOutputValueClass(Text.class); // 可省 136 137 // 6) set partitioner class 138 job.setPartitionerClass(HashPartitioner.class); // 可省 139 140 // 7) set reducer number 141 job.setNumReduceTasks(1);// default 1 //可省 142 143 // 8)set sort comparator class 144 //job.setSortComparatorClass(LongWritable.Comparator.class); // 可省 145 146 // 9) set group comparator class 147 //job.setGroupingComparatorClass(LongWritable.Comparator.class); // 可省 148 149 // 10) set combiner class 150 // job.setCombinerClass(null);默认是null,但是此处不能写 //可省 151 152 // 11) set reducer class 153 job.setReducerClass(ModuleReducer.class); // 可省 154 155 // 12) set output format 156 job.setOutputFormatClass(TextOutputFormat.class); // 可省 157 158 // 13) job output key/value class 159 job.setOutputKeyClass(LongWritable.class); // 可省 160 job.setOutputValueClass(Text.class); // 可省 161 162 163 // step 4: submit job 164 boolean isSuccess = job.waitForCompletion(true); 165 166 // step 5: return status 167 return isSuccess ? 0 : 1; 168 } 169 170 public static void main(String[] args) throws Exception { 171 172 args = new String[] { 173 "hdfs://hadoop-master.dragon.org:9000/wc/mininput/", 174 "hdfs://hadoop-master.dragon.org:9000/wc/minoutput" 175 }; 176 177 //run mapreduce 178 int status=ToolRunner.run(new ModuleMapReduce(), args); 179 180 //exit 181 System.exit(status); 182 } 183 }
View Module Code
模板使用步骤:
1) 改名称(MapReduce类的名称、Mapper类的名称、Reducer类的名称)
2) 依据实际的业务逻辑修改Mapper类和Reducer类的Key/Value输入输出参数的类型
3) 修改驱动Driver部分的Job的参数设置(Mapper类和Reducer类的输出类型)
4) 在Mapper类中编写实际的业务逻辑(setup()、map()、cleanup())
5) 在Reducer类中编写实际的业务逻辑(setup()、map()、cleanup())
6) 检查并修改驱动Driver代码(模板类中的run()方法)
7) 设置输入输出路径,进行MR测试。
使用ModuleMapReduce编写wordcount程序
1 package org.dragon.hadoop.mr.module; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.conf.Configured; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 19 import org.apache.hadoop.util.Tool; 20 import org.apache.hadoop.util.ToolRunner; 21 22 /** 23 * 24 * ########################################### 25 * ############ MapReduce 模板类 ########## 26 * ########################################### 27 * 28 * @author ZhuXY 29 * @time 2016-3-13 下午10:21:06 30 * 31 */ 32 public class WordcountByModuleMapReduce extends Configured implements Tool { 33 34 /** 35 * Mapper Class 36 */ 37 public static class WordcountMapper extends 38 Mapper<LongWritable, Text, Text, LongWritable> { 39 40 @Override 41 protected void setup(Context context) throws IOException, 42 InterruptedException { 43 super.setup(context); 44 } 45 46 private Text word = new Text(); 47 private final static LongWritable one = new LongWritable(1); 48 49 @Override 50 protected void map(LongWritable key, Text value, Context context) 51 throws IOException, InterruptedException { 52 53 // 获取每行数据的值 54 String lineValue = value.toString(); 55 56 // 进行分割 57 StringTokenizer stringTokenizer = new StringTokenizer(lineValue); 58 59 // 遍历 60 while (stringTokenizer.hasMoreElements()) { 61 62 // 获取每个值 63 String worldValue = stringTokenizer.nextToken(); 64 65 // 设置map, 输入的key值 66 word.set(worldValue); 67 context.write(word, one); // 如果出现就出现一次,存在每行出现几次,这时候键的值一样,多个键值对 68 } 69 } 70 71 @Override 72 protected void cleanup(Context context) throws IOException, 73 InterruptedException { 74 super.cleanup(context); 75 } 76 } 77 78 /** 79 * Reducer Class 80 */ 81 public static class WordcountReducer extends 82 Reducer<Text, LongWritable, Text, LongWritable> { 83 private LongWritable resultLongWritable = new LongWritable(); 84 85 @Override 86 protected void setup(Context context) throws IOException, 87 InterruptedException { 88 // TODO Auto-generated method stub 89 super.setup(context); 90 } 91 92 @Override 93 protected void reduce(Text key, Iterable<LongWritable> values, 94 Context context) throws IOException, InterruptedException { 95 int sum = 0; 96 // 循环遍历Interable 97 for (LongWritable value : values) { 98 // 累加 99 sum += value.get(); 100 } 101 102 // 设置总次数 103 resultLongWritable.set(sum); 104 context.write(key, resultLongWritable); 105 } 106 107 @Override 108 protected void cleanup(Context context) throws IOException, 109 InterruptedException { 110 // TODO Auto-generated method stub 111 super.cleanup(context); 112 } 113 114 } 115 116 /** 117 * Driver Class 118 */ 119 120 // 专门抽取一个方法出来用于设置 121 public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) 122 throws IOException { 123 if (args.length != 2) { 124 System.err.printf("Usage:%s [generic options] <input> <output>\n", 125 tool.getClass().getSimpleName()); 126 ToolRunner.printGenericCommandUsage(System.err); 127 return null; 128 } 129 130 // 创建job,并设置配置信息和job名称 131 Job job = new Job(conf, 132 WordcountByModuleMapReduce.class.getSimpleName()); 133 134 // 设置job的运行类 135 // step 3:set job 136 // 1) set run jar class 137 job.setJarByClass(tool.getClass()); 138 139 // 14) job output path 140 FileOutputFormat.setOutputPath(job, new Path(args[1])); 141 142 return job; 143 } 144 145 @Override 146 public int run(String[] args) throws Exception { 147 148 // step 1:get conf 149 Configuration conf = new Configuration(); 150 151 // step 2:create job 152 Job job = parseInputAndOutput(this, conf, args); 153 154 // 2) set input format 155 job.setInputFormatClass(TextInputFormat.class); // 可省 156 157 // 3) set input path 158 FileInputFormat.addInputPath(job, new Path(args[0])); 159 160 // 4) set mapper class 161 job.setMapperClass(WordcountMapper.class); // 可省 162 163 // 5)set map input key/value class 164 job.setMapOutputKeyClass(Text.class); // 可省 165 job.setMapOutputValueClass(LongWritable.class); // 可省 166 167 // 6) set partitioner class 168 job.setPartitionerClass(HashPartitioner.class); // 可省 169 170 // 7) set reducer number 171 job.setNumReduceTasks(1);// default 1 //可省 172 173 // 8)set sort comparator class 174 // job.setSortComparatorClass(LongWritable.Comparator.class); // 可省 175 176 // 9) set group comparator class 177 // job.setGroupingComparatorClass(LongWritable.Comparator.class); // 可省 178 179 // 10) set combiner class 180 // job.setCombinerClass(null);默认是null,但是此处不能写 //可省 181 182 // 11) set reducer class 183 job.setReducerClass(WordcountReducer.class); // 可省 184 185 // 12) set output format 186 job.setOutputFormatClass(TextOutputFormat.class); // 可省 187 188 // 13) job output key/value class 189 job.setOutputKeyClass(Text.class); // 可省 190 job.setOutputValueClass(LongWritable.class); // 可省 191 192 // step 4: submit job 193 boolean isSuccess = job.waitForCompletion(true); 194 195 // step 5: return status 196 return isSuccess ? 0 : 1; 197 } 198 199 public static void main(String[] args) throws Exception { 200 201 args = new String[] { 202 "hdfs://hadoop-master.dragon.org:9000/wc/mininput/", 203 "hdfs://hadoop-master.dragon.org:9000/wc/minoutput" }; 204 205 // run mapreduce 206 int status = ToolRunner.run(new WordcountByModuleMapReduce(), args); 207 208 // exit 209 System.exit(status); 210 } 211 }
View WordcountByModuleMapReduce Code
转载于:https://www.cnblogs.com/xiangyangzhu/p/5281129.html
027_编写MapReduce的模板类Mapper、Reducer和Driver相关推荐
- 如何在Hadoop上编写MapReduce程序
1. 概述 1970年,IBM的研究员E.F.Codd博士在刊物<Communication of the ACM>上发表了一篇名为"A Relational Model of ...
- mapreduce编程实例python-Python编写MapReduce作业的简单示例
这篇文章主要为大家详细介绍了Python编写MapReduce作业的简单示例,具有一定的参考价值,可以用来参考一下. 对python这个高级语言感兴趣的小伙伴,下面一起跟随512笔记的小编两巴掌来看看 ...
- python hadoop streaming_如何在Hadoop中使用Streaming编写MapReduce(转帖)
作者:马士华 发表于:2008-03-05 12:51 最后更新于:2008-03-25 11:18 版权声明:可以任意转载,转载时请务必以超链接形式标明文章原始出处和作者信息. http://www ...
- 定义一个类mymath_C++:模板类
22.模板类 22.1 模板类 模板是泛型编程的基础,那什么是泛型编程呢?泛型编程是一种独立于任何特定数据类型编写代码的方式. C++标准模板库中的数据容器.迭代器和算法,都是泛型编程的例子,它们都使 ...
- C++模板类嵌套类内部类局部类的区别
模板类就是将类定义成模板的形式. C++中好像不区分内部类与嵌套类两个名词. 内部类与嵌套类都是指在类中定义类. 局部类是指在函数中定义类. (c++不能在函数中定义函数(python可以).c++在 ...
- 从汇编的眼光看C++(之递归函数与模板类)
[ 声明:版权所有,欢迎转载,请勿用于商业用途. 联系信箱:feixiaoxing @163.com] 递归,相信有过基本C语言经验的朋友都明白,就是函数自己调用自己.所以,本质上说,它和普通的函数 ...
- C++面向对象实现一个模板类链表
题目: 1. 请创建一个数据类型为T的链表类模板List,实现以下成员函数: 1) 默认构造函数List(),将该链表初始化为一个空链表(10分) 2) 拷贝构造函数List(constList ...
- C++使用模板类实现简单的人事管理系统
写在前面 这几天我一直在做我们<C++与数据结构>课程的大作业--c++人事管理系统.在写的过程中遇到了很多困难,也让我深刻地体会到"纸上得来终觉浅,绝知此事要躬行". ...
- c++模板类(一)理解编译器的编译模板过程
如何组织编写模板程序 前言 常遇到询问使用模板到底是否容易的问题,我的回答是:"模板的使用是容易的,但组织编写却不容易".看看我们几乎每天都能遇到的模板类吧,如STL, ATL, ...
最新文章
- 【PyTorch学习笔记】4:在Tensor上的索引和切片
- 问题 | CSDN编辑图像怎么使图像居中、偏左、偏右
- 为什么运行了java文件老是404_java – 为什么Spring MVC用404响应并报告“在...
- 利剑无意之scala小考核
- at命令不生效 linux_【干货】你不知道的 Linux 命令使用技巧
- 美股第三次熔断!一觉醒来,苹果损失了1.5亿部iPhone 11 Pro
- Win32中文件的操作
- php怎么代表不同行,php – 单击按钮时显示数据库的不同行
- mysql在官网下载完解压后安装
- redis常用内容信息:
- 最新的Scrum中文指南及更新
- TFTP协议下载服务器指定文件夹内的图片
- Java面试题及答案整理 2022 年 8 月最新版
- mysql 通达信公式_公式选股--均线黏合(更多公式关注公众号“斯达克逻辑”)...
- POJ 3322 BFS
- win10 系统版本号获取的三种方法
- Linux压缩包,解压缩包,vim,yum仓库,zip,用户管理
- 原神改文件换服务器,原神B服怎么转成官服
- 终极单词index 排序 K-L
- linux和乌班图和麒麟系统,UbuntuKylin 和麒麟系统是两支不同渠道的系统