模板类编写好后写MapReduce程序,的模板类编写好以后只需要改参数就行了,代码如下:

  1 package org.dragon.hadoop.mr.module;
  2
  3 import java.io.IOException;
  4
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.conf.Configured;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.LongWritable;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Reducer;
 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 17 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 18 import org.apache.hadoop.util.Tool;
 19 import org.apache.hadoop.util.ToolRunner;
 20
 21 /**
 22  *
 23  * ###########################################
 24  * ############     MapReduce 模板类        ##########
 25  * ###########################################
 26  *
 27  * @author ZhuXY
 28  * @time 2016-3-13 下午10:21:06
 29  *
 30  */
 31 public class ModuleMapReduce extends Configured implements Tool {
 32
 33     /**
 34      * Mapper Class
 35      */
 36     public static class ModuleMapper extends
 37             Mapper<LongWritable, Text, LongWritable, Text> {
 38
 39         @Override
 40         protected void setup(Context context) throws IOException,
 41                 InterruptedException {
 42             super.setup(context);
 43         }
 44
 45         @Override
 46         protected void map(LongWritable key, Text value, Context context)
 47                 throws IOException, InterruptedException {
 48             super.map(key, value, context);
 49         }
 50
 51         @Override
 52         protected void cleanup(Context context) throws IOException,
 53                 InterruptedException {
 54             super.cleanup(context);
 55         }
 56     }
 57
 58     /**
 59      * Reducer Class
 60      */
 61     public static class ModuleReducer extends
 62             Reducer<LongWritable, Text, LongWritable, Text> {
 63
 64         @Override
 65         protected void setup(Context context) throws IOException,
 66                 InterruptedException {
 67             // TODO Auto-generated method stub
 68             super.setup(context);
 69         }
 70
 71         @Override
 72         protected void reduce(LongWritable key, Iterable<Text> values,
 73                 Context context) throws IOException, InterruptedException {
 74             // TODO Auto-generated method stub
 75             super.reduce(key, values, context);
 76         }
 77
 78         @Override
 79         protected void cleanup(Context context) throws IOException,
 80                 InterruptedException {
 81             // TODO Auto-generated method stub
 82             super.cleanup(context);
 83         }
 84
 85     }
 86
 87     /**
 88      * Driver Class
 89      */
 90
 91     //    专门抽取一个方法出来用于设置
 92     public Job parseInputAndOutput(Tool tool,Configuration conf,String[] args) throws IOException
 93     {
 94         if (args.length!=2) {
 95             System.err.printf("Usage:%s [generic options] <input> <output>\n", tool.getClass().getSimpleName());
 96             ToolRunner.printGenericCommandUsage(System.err);
 97             return null;
 98         }
 99
100         //创建job,并设置配置信息和job名称
101         Job job=new Job(conf, ModuleMapReduce.class.getSimpleName());
102
103         //设置job的运行类
104         // step 3:set job
105         // 1) set run jar class
106         job.setJarByClass(tool.getClass());
107
108         // 14) job output path
109         FileOutputFormat.setOutputPath(job, new Path(args[1]));
110
111         return job;
112     }
113
114     @Override
115     public int run(String[] args) throws Exception {
116
117         // step 1:get conf
118         Configuration conf = new Configuration();
119
120         // step 2:create job
121         Job job = parseInputAndOutput(this, conf, args);
122
123
124         // 2) set input format
125         job.setInputFormatClass(TextInputFormat.class); // 可省
126
127         // 3) set input path
128         FileInputFormat.addInputPath(job, new Path(args[0]));
129
130         // 4) set mapper class
131         job.setMapperClass(ModuleMapper.class); // 可省
132
133         // 5)set map input key/value class
134         job.setMapOutputKeyClass(LongWritable.class); // 可省
135         job.setMapOutputValueClass(Text.class); // 可省
136
137         // 6) set partitioner class
138         job.setPartitionerClass(HashPartitioner.class); // 可省
139
140         // 7) set reducer number
141         job.setNumReduceTasks(1);// default 1 //可省
142
143         // 8)set sort comparator class
144         //job.setSortComparatorClass(LongWritable.Comparator.class); // 可省
145
146         // 9) set group comparator class
147         //job.setGroupingComparatorClass(LongWritable.Comparator.class); // 可省
148
149         // 10) set combiner class
150         // job.setCombinerClass(null);默认是null,但是此处不能写 //可省
151
152         // 11) set reducer class
153         job.setReducerClass(ModuleReducer.class); // 可省
154
155         // 12) set output format
156         job.setOutputFormatClass(TextOutputFormat.class); // 可省
157
158         // 13) job output key/value class
159         job.setOutputKeyClass(LongWritable.class); // 可省
160         job.setOutputValueClass(Text.class); // 可省
161
162
163         // step 4: submit job
164         boolean isSuccess = job.waitForCompletion(true);
165
166         // step 5: return status
167         return isSuccess ? 0 : 1;
168     }
169
170     public static void main(String[] args) throws Exception {
171
172         args = new String[] {
173                 "hdfs://hadoop-master.dragon.org:9000/wc/mininput/",
174                 "hdfs://hadoop-master.dragon.org:9000/wc/minoutput"
175                 };
176
177         //run mapreduce
178         int status=ToolRunner.run(new ModuleMapReduce(), args);
179
180         //exit
181         System.exit(status);
182     }
183 }

View Module Code

模板使用步骤:

1) 改名称(MapReduce类的名称、Mapper类的名称、Reducer类的名称)

2) 依据实际的业务逻辑修改Mapper类和Reducer类的Key/Value输入输出参数的类型

3) 修改驱动Driver部分的Job的参数设置(Mapper类和Reducer类的输出类型)

4) 在Mapper类中编写实际的业务逻辑(setup()、map()、cleanup())

5) 在Reducer类中编写实际的业务逻辑(setup()、map()、cleanup())

6) 检查并修改驱动Driver代码(模板类中的run()方法)

7) 设置输入输出路径,进行MR测试。

使用ModuleMapReduce编写wordcount程序

  1 package org.dragon.hadoop.mr.module;
  2
  3 import java.io.IOException;
  4 import java.util.StringTokenizer;
  5
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.conf.Configured;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.LongWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.mapreduce.Job;
 12 import org.apache.hadoop.mapreduce.Mapper;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 19 import org.apache.hadoop.util.Tool;
 20 import org.apache.hadoop.util.ToolRunner;
 21
 22 /**
 23  *
 24  * ###########################################
 25  * ############        MapReduce 模板类     ##########
 26  * ###########################################
 27  *
 28  * @author ZhuXY
 29  * @time 2016-3-13 下午10:21:06
 30  *
 31  */
 32 public class WordcountByModuleMapReduce extends Configured implements Tool {
 33
 34     /**
 35      * Mapper Class
 36      */
 37     public static class WordcountMapper extends
 38             Mapper<LongWritable, Text, Text, LongWritable> {
 39
 40         @Override
 41         protected void setup(Context context) throws IOException,
 42                 InterruptedException {
 43             super.setup(context);
 44         }
 45
 46         private Text word = new Text();
 47         private final static LongWritable one = new LongWritable(1);
 48
 49         @Override
 50         protected void map(LongWritable key, Text value, Context context)
 51                 throws IOException, InterruptedException {
 52
 53             // 获取每行数据的值
 54             String lineValue = value.toString();
 55
 56             // 进行分割
 57             StringTokenizer stringTokenizer = new StringTokenizer(lineValue);
 58
 59             // 遍历
 60             while (stringTokenizer.hasMoreElements()) {
 61
 62                 // 获取每个值
 63                 String worldValue = stringTokenizer.nextToken();
 64
 65                 // 设置map, 输入的key值
 66                 word.set(worldValue);
 67                 context.write(word, one); // 如果出现就出现一次,存在每行出现几次,这时候键的值一样,多个键值对
 68             }
 69         }
 70
 71         @Override
 72         protected void cleanup(Context context) throws IOException,
 73                 InterruptedException {
 74             super.cleanup(context);
 75         }
 76     }
 77
 78     /**
 79      * Reducer Class
 80      */
 81     public static class WordcountReducer extends
 82             Reducer<Text, LongWritable, Text, LongWritable> {
 83         private LongWritable resultLongWritable = new LongWritable();
 84
 85         @Override
 86         protected void setup(Context context) throws IOException,
 87                 InterruptedException {
 88             // TODO Auto-generated method stub
 89             super.setup(context);
 90         }
 91
 92         @Override
 93         protected void reduce(Text key, Iterable<LongWritable> values,
 94                 Context context) throws IOException, InterruptedException {
 95             int sum = 0;
 96             // 循环遍历Interable
 97             for (LongWritable value : values) {
 98                 // 累加
 99                 sum += value.get();
100             }
101
102             // 设置总次数
103             resultLongWritable.set(sum);
104             context.write(key, resultLongWritable);
105         }
106
107         @Override
108         protected void cleanup(Context context) throws IOException,
109                 InterruptedException {
110             // TODO Auto-generated method stub
111             super.cleanup(context);
112         }
113
114     }
115
116     /**
117      * Driver Class
118      */
119
120     // 专门抽取一个方法出来用于设置
121     public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args)
122             throws IOException {
123         if (args.length != 2) {
124             System.err.printf("Usage:%s [generic options] <input> <output>\n",
125                     tool.getClass().getSimpleName());
126             ToolRunner.printGenericCommandUsage(System.err);
127             return null;
128         }
129
130         // 创建job,并设置配置信息和job名称
131         Job job = new Job(conf,
132                 WordcountByModuleMapReduce.class.getSimpleName());
133
134         // 设置job的运行类
135         // step 3:set job
136         // 1) set run jar class
137         job.setJarByClass(tool.getClass());
138
139         // 14) job output path
140         FileOutputFormat.setOutputPath(job, new Path(args[1]));
141
142         return job;
143     }
144
145     @Override
146     public int run(String[] args) throws Exception {
147
148         // step 1:get conf
149         Configuration conf = new Configuration();
150
151         // step 2:create job
152         Job job = parseInputAndOutput(this, conf, args);
153
154         // 2) set input format
155         job.setInputFormatClass(TextInputFormat.class); // 可省
156
157         // 3) set input path
158         FileInputFormat.addInputPath(job, new Path(args[0]));
159
160         // 4) set mapper class
161         job.setMapperClass(WordcountMapper.class); // 可省
162
163         // 5)set map input key/value class
164         job.setMapOutputKeyClass(Text.class); // 可省
165         job.setMapOutputValueClass(LongWritable.class); // 可省
166
167         // 6) set partitioner class
168         job.setPartitionerClass(HashPartitioner.class); // 可省
169
170         // 7) set reducer number
171         job.setNumReduceTasks(1);// default 1 //可省
172
173         // 8)set sort comparator class
174         // job.setSortComparatorClass(LongWritable.Comparator.class); // 可省
175
176         // 9) set group comparator class
177         // job.setGroupingComparatorClass(LongWritable.Comparator.class); // 可省
178
179         // 10) set combiner class
180         // job.setCombinerClass(null);默认是null,但是此处不能写 //可省
181
182         // 11) set reducer class
183         job.setReducerClass(WordcountReducer.class); // 可省
184
185         // 12) set output format
186         job.setOutputFormatClass(TextOutputFormat.class); // 可省
187
188         // 13) job output key/value class
189         job.setOutputKeyClass(Text.class); // 可省
190         job.setOutputValueClass(LongWritable.class); // 可省
191
192         // step 4: submit job
193         boolean isSuccess = job.waitForCompletion(true);
194
195         // step 5: return status
196         return isSuccess ? 0 : 1;
197     }
198
199     public static void main(String[] args) throws Exception {
200
201         args = new String[] {
202                 "hdfs://hadoop-master.dragon.org:9000/wc/mininput/",
203                 "hdfs://hadoop-master.dragon.org:9000/wc/minoutput" };
204
205         // run mapreduce
206         int status = ToolRunner.run(new WordcountByModuleMapReduce(), args);
207
208         // exit
209         System.exit(status);
210     }
211 }

View WordcountByModuleMapReduce Code

转载于:https://www.cnblogs.com/xiangyangzhu/p/5281129.html

027_编写MapReduce的模板类Mapper、Reducer和Driver相关推荐

  1. 如何在Hadoop上编写MapReduce程序

    1. 概述 1970年,IBM的研究员E.F.Codd博士在刊物<Communication of the ACM>上发表了一篇名为"A Relational Model of ...

  2. mapreduce编程实例python-Python编写MapReduce作业的简单示例

    这篇文章主要为大家详细介绍了Python编写MapReduce作业的简单示例,具有一定的参考价值,可以用来参考一下. 对python这个高级语言感兴趣的小伙伴,下面一起跟随512笔记的小编两巴掌来看看 ...

  3. python hadoop streaming_如何在Hadoop中使用Streaming编写MapReduce(转帖)

    作者:马士华 发表于:2008-03-05 12:51 最后更新于:2008-03-25 11:18 版权声明:可以任意转载,转载时请务必以超链接形式标明文章原始出处和作者信息. http://www ...

  4. 定义一个类mymath_C++:模板类

    22.模板类 22.1 模板类 模板是泛型编程的基础,那什么是泛型编程呢?泛型编程是一种独立于任何特定数据类型编写代码的方式. C++标准模板库中的数据容器.迭代器和算法,都是泛型编程的例子,它们都使 ...

  5. C++模板类嵌套类内部类局部类的区别

    模板类就是将类定义成模板的形式. C++中好像不区分内部类与嵌套类两个名词. 内部类与嵌套类都是指在类中定义类. 局部类是指在函数中定义类. (c++不能在函数中定义函数(python可以).c++在 ...

  6. 从汇编的眼光看C++(之递归函数与模板类)

    [ 声明:版权所有,欢迎转载,请勿用于商业用途.  联系信箱:feixiaoxing @163.com] 递归,相信有过基本C语言经验的朋友都明白,就是函数自己调用自己.所以,本质上说,它和普通的函数 ...

  7. C++面向对象实现一个模板类链表

    题目: 1.    请创建一个数据类型为T的链表类模板List,实现以下成员函数: 1) 默认构造函数List(),将该链表初始化为一个空链表(10分) 2) 拷贝构造函数List(constList ...

  8. C++使用模板类实现简单的人事管理系统

    写在前面 这几天我一直在做我们<C++与数据结构>课程的大作业--c++人事管理系统.在写的过程中遇到了很多困难,也让我深刻地体会到"纸上得来终觉浅,绝知此事要躬行". ...

  9. c++模板类(一)理解编译器的编译模板过程

    如何组织编写模板程序 前言 常遇到询问使用模板到底是否容易的问题,我的回答是:"模板的使用是容易的,但组织编写却不容易".看看我们几乎每天都能遇到的模板类吧,如STL, ATL, ...

最新文章

  1. 【PyTorch学习笔记】4:在Tensor上的索引和切片
  2. 问题 | CSDN编辑图像怎么使图像居中、偏左、偏右
  3. 为什么运行了java文件老是404_java – 为什么Spring MVC用404响应并报告“在...
  4. 利剑无意之scala小考核
  5. at命令不生效 linux_【干货】你不知道的 Linux 命令使用技巧
  6. 美股第三次熔断!一觉醒来,苹果损失了1.5亿部iPhone 11 Pro
  7. Win32中文件的操作
  8. php怎么代表不同行,php – 单击按钮时显示数据库的不同行
  9. mysql在官网下载完解压后安装
  10. redis常用内容信息:
  11. 最新的Scrum中文指南及更新
  12. TFTP协议下载服务器指定文件夹内的图片
  13. Java面试题及答案整理 2022 年 8 月最新版
  14. mysql 通达信公式_公式选股--均线黏合(更多公式关注公众号“斯达克逻辑”)...
  15. POJ 3322 BFS
  16. win10 系统版本号获取的三种方法
  17. Linux压缩包,解压缩包,vim,yum仓库,zip,用户管理
  18. 原神改文件换服务器,原神B服怎么转成官服
  19. 终极单词index 排序 K-L
  20. linux和乌班图和麒麟系统,UbuntuKylin 和麒麟系统是两支不同渠道的系统

热门文章

  1. linux目录/etc/nc.d/nc.local开机启动项无效
  2. JAVA8中Set排序四种写法
  3. Apollo添加部门
  4. linux下安装minio
  5. Kotlin入门教程——目录索引
  6. Android开发笔记(八十)运行状态检查
  7. zip安装mysql没有软件,windows10系统安装mysql-8.0.13(zip安装) 的教程详解
  8. aⅴgo安装包下载_Mysql 安装
  9. C# 数据库连接字符串拼接
  10. 写了10年Javascript未必全了解的标识符顺序