MapReduce功能实现三---Top N
MapReduce功能实现系列:
MapReduce功能实现一—Hbase和Hdfs之间数据相互转换
MapReduce功能实现二—排序
MapReduce功能实现三—Top N
MapReduce功能实现四—小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)
MapReduce功能实现五—去重(Distinct)、计数(Count)
MapReduce功能实现六—最大值(Max)、求和(Sum)、平均值(Avg)
MapReduce功能实现七—小综合(多个job串行处理计算平均值)
MapReduce功能实现八—分区(Partition)
MapReduce功能实现九—Pv、Uv
MapReduce功能实现十—倒排索引(Inverted Index)
MapReduce功能实现十一—join
一、情况1
创建文件并上传到HDFS中:
[hadoop@h71 q1]$ vi test.txt
a 1000
b 2000
c 90000
d 88
e 999999
f 9998
g 13223
注意:这里的分隔符是/t(Tab键)而不是空格
[hadoop@h71 q1]$ hadoop fs -put test.txt /input
java代码:
import java.io.IOException;
import java.util.TreeMap;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TopK { public static final int K = 2; public static class KMap extends Mapper<LongWritable,Text,IntWritable,Text> {//必须得加static,否则会报错:Error: java.lang.RuntimeException: java.lang.NoSuchMethodException: TopK1$KMap.<init>()TreeMap<Integer, String> map = new TreeMap<Integer, String>();@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();if(line.trim().length() > 0 && line.indexOf("\t") != -1) {String[] arr = line.split("\t", 2);String name = arr[0]; Integer num = Integer.parseInt(arr[1]); map.put(num, name); if(map.size() > K) {map.remove(map.firstKey()); } } } @Overrideprotected void cleanup(Mapper<LongWritable, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException { for(Integer num : map.keySet()) { context.write(new IntWritable(num), new Text(map.get(num))); } } } public static class KReduce extends Reducer<IntWritable, Text, Text, IntWritable> { TreeMap<Integer, String> map = new TreeMap<Integer, String>(); @Override public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { map.put(key.get(), values.iterator().next().toString()); if(map.size() > K) {map.remove(map.firstKey()); } } @Override protected void cleanup(Reducer<IntWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { for(Integer num : map.keySet()) {
// context.write(new IntWritable(num), new Text(map.get(num))); context.write(new Text(map.get(num)), new IntWritable(num));} } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration();Job job = new Job(conf, "my own word count"); job.setJarByClass(TopK.class); job.setMapperClass(KMap.class); //必须把下面这行注释掉,不然会报错:Error: java.io.IOException: wrong key class: class org.apache.hadoop.io.Text is not class org.apache.hadoop.io.IntWritable
// job.setCombinerClass(KReduce.class);job.setReducerClass(KReduce.class); job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.out.println(job.waitForCompletion(true));}
}
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac TopK.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar TopK*class
[hadoop@h71 q1]$ hadoop jar xx.jar TopK /input/test.txt /output[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
c 90000
e 999999
上述程序中用到了treemap,下面给大家一个小列子:
import java.util.Map.Entry;
import java.util.TreeMap; public class TreeMapDemo {public static void main(String[] args) { TreeMap<Long, Long> tree = new TreeMap<Long, Long>(); tree.put(3333333L, 1333333L); tree.put(2222222L, 1222222L); tree.put(5555555L, 1555555L); tree.put(4444444L, 1444444L); for (Entry<Long, Long> entry : tree.entrySet()) { System.out.println(entry.getKey()+":"+entry.getValue()); }System.out.println("------------------------------------");System.out.println(tree.firstEntry().getValue()); //最小值 System.out.println("------------------------------------");System.out.println(tree.lastEntry().getValue()); //最大值 System.out.println("------------------------------------");System.out.println(tree.navigableKeySet()); //从小到大的正序key集合 System.out.println("------------------------------------");System.out.println(tree.descendingKeySet());//从大到小的倒序key集合 }
}
运行结果:
2222222:1222222
3333333:1333333
4444444:1444444
5555555:1555555
------------------------------------
1222222
------------------------------------
1555555
------------------------------------
[2222222, 3333333, 4444444, 5555555]
------------------------------------
[5555555, 4444444, 3333333, 2222222]
二、情况2
orderid(订单号),userid(用户名),payment(付款额),productid(产品编号),求topN的payment值:
[hadoop@h71 q1]$ vi a.txt
1,9819,100,121
2,8918,2000,111
3,2813,1234,22
4,9100,10,1101
5,3210,490,111
6,1298,28,1211
7,1010,281,90
8,1818,9000,20
[hadoop@h71 q1]$ vi b.txt
100,3333,10,100
101,9321,1000,293
102,3881,701,20
103,6791,910,30
104,8888,11,39
[hadoop@h71 q1]$ hadoop fs -put a.txt /input
[hadoop@h71 q1]$ hadoop fs -put b.txt /input
1.方法一:
因为MR默认是升序的因此要自定义输入类型,自定义倒序的整型MyIntWritable输入:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Top {public static class MyIntWritable implements WritableComparable<MyIntWritable> { private Integer num; public MyIntWritable(Integer num) { this.num = num; } public MyIntWritable() { } @Override public void write(DataOutput out) throws IOException { out.writeInt(num); } @Override public void readFields(DataInput in) throws IOException { this.num = in.readInt(); } @Override public int compareTo(MyIntWritable o) { int minus = this.num - o.num; return minus * (-1); } @Override public int hashCode() { return this.num.hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof MyIntWritable) { return false; } MyIntWritable ok2 = (MyIntWritable) obj; return (this.num == ok2.num); } @Override public String toString() { return num + ""; } }public static class TopNMapper extends Mapper<LongWritable, Text, MyIntWritable, Text> { protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString().trim(); if (line.length() > 0) {// 1,9819,100,121 String[] arr = line.split(","); if (arr.length == 4) { int payment = Integer.parseInt(arr[2]); context.write(new MyIntWritable(payment), new Text("")); } } } }public static class TopNReducer extends Reducer<MyIntWritable, Text, Text, MyIntWritable> { private int idx = 0; @Override protected void reduce(MyIntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { idx++; if (idx <= 5) { context.write(new Text(idx + ""), key); } } }public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); Job job = new Job(configuration, "topn_job"); job.setJarByClass(Top.class); job.setMapperClass(TopNMapper.class); job.setMapOutputKeyClass(MyIntWritable.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(TopNReducer.class); job.setOutputKeyClass(MyIntWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); Path path = new Path(args[1]); FileSystem fs = FileSystem.get(configuration); if (fs.exists(path)) { fs.delete(path, true); } FileOutputFormat.setOutputPath(job, path); job.setNumReduceTasks(1); System.exit(job.waitForCompletion(true) ? 0 : 1); }
}
2.方法二:
java代码(求第三列的Top5):
import java.io.IOException;
import java.util.Arrays;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Top {public static class TopNMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{int len;int top[];@Overridepublic void setup(Context context) throws IOException,InterruptedException {len = context.getConfiguration().getInt("N", 10);top = new int[len+1];}@Overridepublic void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();String arr []= line.split(",");if(arr != null && arr.length == 4){int pay = Integer.parseInt(arr[2]);add(pay);}}public void add(int pay){top[0] = pay;Arrays.sort(top);}@Overridepublic void cleanup(Context context) throws IOException,InterruptedException {for(int i=1;i<=len;i++){context.write(new IntWritable(top[i]),new IntWritable(top[i]));}}}public static class TopNReduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{int len;int top[];@Overridepublic void setup(Context context)throws IOException, InterruptedException {len = context.getConfiguration().getInt("N", 10);top = new int[len+1];} @Overridepublic void reduce(IntWritable key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {for(IntWritable val : values){add(val.get());}}public void add(int pay){top[0] = pay;Arrays.sort(top);}@Overridepublic void cleanup(Context context)throws IOException, InterruptedException {for(int i=len;i>0;i--){context.write(new IntWritable(len-i+1),new IntWritable(top[i]));}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInt("N", 5);Job job = new Job(conf, "my own word count"); job.setJarByClass(Top.class); job.setMapperClass(TopNMapper.class); job.setCombinerClass(TopNReduce.class); job.setReducerClass(TopNReduce.class); job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.out.println(job.waitForCompletion(true));}
}
方法二说明:
在最初接触mapreduce时,top n问题的解决办法是将mapreduce输出(排序后)放入一个集合中,取前n个,但这种写法过于简单,内存能够加载的集合的大小是有上限的,一旦数据量大,很容易出现内存溢出。
如果要取top 5,则应该定义一个长度为6的数组,map所要做的事情就是将每条日志的那个需要排序的字段放入数组第一个元素中,调用Arrays.sort(Array[])方法可以将数组按照正序,从数字角度说是从小到大排序,比如第一条记录是9000,那么排序结果是[0,0,0,0,0,9000],第二条日志记录是8000,排序结果是[0,0,0,0,8000,9000],第三条日志记录是8500,排序结果是[0,0,0,8000,8500,9000],以此类推,每次放进去一个数字如果大于数组里面最小的元素,相当于将最小的覆盖掉了,也就是说数组中元素永远是拿到日志中最大的那些个记录。
ok,map将数组原封不动按照顺序输出,reduce接收到从每个map拿到的五个排好序的元素,在进行跟map一样的排序,排序后数组里面就是按照从小到大排好序的元素,将这些元素倒序输出就是最终我们要的结果了。
与之前的方式做个比较,之前的map做的事情很少,在reduce中排序后拿前5条,reduce的压力是很大的,要把所有的数据都处理一遍,而一般设置reduce的个数较少,一旦数据较多,reduce就会承受不了,悲剧了。而现在的方式巧妙的将reduce的压力转移到了map,而map是集群效应的,很多台服务器来做这件事情,减少了一台机器上的负担,每个map其实只是输出了5个元素而已,如果有5个map,其实reduce才对5*5个数据进行了操作,也就不会出现内存溢出等问题了。
运行程序:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac Top.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar Top*class
[hadoop@h71 q1]$ hadoop jar xx.jar Top /input/* /output[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
1 9000
2 2000
3 1234
4 1000
5 910
三、情况3
生成随机数:该程序生成了10个文件,每个文件包括一百万个Integer范围的随机数,生成完成后将其复制并上传到虚拟机的Hadoop文件系统HDFS中:
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.util.Random;public class Num_Generator {public static void main(String[] args) {FileOutputStream fos;OutputStreamWriter osw;BufferedWriter bw;Random random = new Random();String filename = "random_num";for (int i = 0; i < 10; i++) {String tmp_name = filename+""+i+".txt";File file = new File(tmp_name);try {fos = new FileOutputStream(file);osw = new OutputStreamWriter(fos,"UTF-8");bw = new BufferedWriter(osw);for (int j = 0; j < 1000000; j++) { int rm = random.nextInt();bw.write(rm+"");bw.newLine();}bw.flush();} catch (FileNotFoundException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}System.out.println(i+":Complete.");}}
}
TopN程序:
import java.io.IOException;
import java.util.TreeMap;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class TopN {public static class MyMapper extends Mapper<Object, Text, NullWritable, IntWritable>{private TreeMap<Integer, Integer> tree = new TreeMap<Integer, Integer>();
// private final static IntWritable one = new IntWritable(1);
// private Text number = new Text();@Overrideprotected void setup(Context context) throws IOException,InterruptedException {// super.setup(context);System.out.println("Mapper("+context.getConfiguration().getInt("N", 10)+"):in setup...");}@Overrideprotected void cleanup(Context context) throws IOException,InterruptedException {// super.cleanup(context);System.out.println("Mapper("+context.getConfiguration().getInt("N", 10)+"):in cleanup...");for(Integer text : tree.values()){context.write(NullWritable.get(), new IntWritable(text));}}@Overridepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException{String key_num = value.toString();int num = Integer.parseInt(key_num);tree.put(num, num);if(tree.size() > context.getConfiguration().getInt("N", 10))tree.remove(tree.firstKey());
// System.out.println("Mapper("+context.getConfiguration().getInt("N", 10)+"):"+key.toString()+"/"+value.toString());
// number.set(key_num);
// context.write(number, one);}}public static class MyReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable>{
// private IntWritable kk = new IntWritable();private TreeMap<Integer, Integer> tree = new TreeMap<Integer, Integer>();
// private IntWritable result = new IntWritable();@Overridepublic void reduce(NullWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{for (IntWritable value : values){tree.put(value.get(), value.get());if(tree.size() > context.getConfiguration().getInt("N", 10)){tree.remove(tree.firstKey());}}
// System.out.println("Reducer("+context.getConfiguration().getInt("N", 10)+"):"+key.toString()+"/"+result.get());}@Overrideprotected void cleanup(org.apache.hadoop.mapreduce.Reducer.Context context)throws IOException, InterruptedException {// super.cleanup(context);for(Integer val : tree.descendingKeySet()){context.write(NullWritable.get(), new IntWritable(val));}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if(otherArgs.length < 3){System.err.println("heheda");System.exit(2);}conf.setInt("N", new Integer(otherArgs[0]));System.out.println("N:"+otherArgs[0]);Job job = Job.getInstance(conf, "TopN");job.setJarByClass(TopN.class);job.setMapperClass(MyMapper.class);
// job.setCombinerClass(MyReducer.class);job.setMapOutputKeyClass(NullWritable.class);job.setMapOutputValueClass(IntWritable.class);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(IntWritable.class);for (int i = 1; i < otherArgs.length-1; i++) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
问题:在程序中有这么一行代码context.getConfiguration().getInt(“N”, 10);实在是没理解10这个参数有什么用,而且我后来测试将10改为其他值也不影响结果啊。"N"还不能瞎改,map和reduce函数的这个字符还必须和主函数中的一样,我后来将主函数的这个字符改为和其他地方不一样后,输出的结果不以主函数的数字为主,而是以其他地方的数字为主。
答疑:setInt(String name, int value)为在本地properties中加入(name,value)。getInt(String name, int defaultValue)为尝试查询name对应的整数,失败则返回defaultValue。
先执行Num_Generator.java并将结果上传到hdfs上:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac Num_Generator.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/java Num_Generator
[hadoop@h71 q1]$ hadoop fs -put random_num*.txt /in
再执行TopN.java:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac TopN.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar TopN*class
[hadoop@h71 q1]$ hadoop jar xx.jar TopN 7 /in/* /out
[hadoop@h71 q1]$ hadoop fs -cat /out/part-r-00000
2147483639
2147483112
2147482692
2147482113
2147481936
2147481909
2147481535
MapReduce功能实现三---Top N相关推荐
- MapReduce功能实现十---倒排索引(Inverted Index)
MapReduce功能实现系列: MapReduce功能实现一-Hbase和Hdfs之间数据相互转换 MapReduce功能实现二-排序 MapReduce功能实现 ...
- SpringBoot中使用Easyexcel实现Excel导入导出功能(三)
导出的数据包含有图片 导出excel表格的数据包含有图片,这种场景比较少.通Easyexcel实现这样的需求,我认为最简便的方法就是使用前面提到的自定义转换器(com.alibaba.excel.co ...
- javascript的等于和不等于运算符用法与功能(三)
javascript的常见运算符用法与功能(三) 这里所纪录的运算符是javascript中的等于(==)和不等于(!=),这两种运算符. //返回的结果为布尔值. var a='1'; var b= ...
- SAP Marketing Cloud功能简述(三) 营销活动内容设计和产品推荐
Grace的前两篇文章: SAP Marketing Cloud功能简述(一) : Contacts和Profiles SAP Marketing Cloud功能简述(二) : Target Grou ...
- MapReduce算法设计(三)----相对频率计算
1. 相对频率的计算 在我们使用应用程序来分析文章时,一个重要的使用就是文章主题分类.就是依据文章所要表达的主题进行分类.而一般的程序化分类 (非人工分类)所使用的方法是TF-IDF.这种方法依 ...
- win10系统停止更新服务器,Win10系统关闭自动更新功能的三种最佳方法
Win10系统正式版发布在即,受到很多微软粉丝的追捧,并且微软也启动了为期一年的免费升级计划.很多用户使用Win10时发现一个问题,win10系统的自动更新功能是无法关闭的,导致用户都是给强制安装了推 ...
- 阿里云播放器SDK的正确打开方式 | Aliplayer Web播放器介绍及功能实现(三)
阿里云播放器SDK(ApsaraVideo for Player SDK)是阿里视频云端到云到端服务的重要一环,除了支持点播和直播的基础播放功能外,还深度融合视频云业务,支持视频的加密播放.安全下载. ...
- SAP Marketing Cloud 功能概述(三)
SAP Marketing Cloud提供了一套全面的市场营销应用,可用来帮助营销人员专注于客户.培养联系人并创建线索和活动. Marketing Lead Management 线索管理(Lead ...
- OTA升级功能系列三(MD5加密)
前言 在前面的两个章节中,我们简单介绍了在OTA升级过程中,如何对文件进行解压缩和加解密的操作.今天,就讲讲这个系列最后的内容,MD5加密. 一.MD5加密是什么? MD5加密,是一种开源的加密算法. ...
- 怎么用计算机要微信,电脑端要实现“微信双开”功能,三种方法,简单实用!...
在我们的日常生活中,微信它已经不仅仅是一个聊天工具了,我们的许多工作很多时候也离不开它. 有时我们想把生活和工作分开,很多人都有两个微信号,一个工作号一个生活号:工作号用于工作日的业务,生活号用于日常 ...
最新文章
- 理解卷积神经网络的局限
- Cookie的小知识
- RabbitMQ系列(二)深入了解RabbitMQ工作原理及简单使用
- 价值6000的信息分类系统源码
- 飘了!英特尔 2 年内要发布高效芯片超过苹果 M1
- 【python之路10】python实例练习
- win10自带sftp服务器_用于Windows系统的免费SFTP服务器-Free SFTP Servers及各款软件功能对比...
- 零基础入门微信小程序开发
- 2022年更新正大杯获得国家二等奖经验优秀报告资料分享全国大学生市场调查与分析大赛市调大赛保研竞赛加分怎么找队友等全套经验分享
- python_体脂率的计算
- js去空格 回车 制表符 换页符
- HBuilder制作表格式的简历
- 聚焦Java性能优化 打造亿级流量秒杀系统【学习笔记】01_电商秒杀商品回顾
- 第二课堂计算机记录,第二课堂计算机小结
- vue createElement后删除这个元素 the node to be removed is not a child of this node
- 用php求圆柱圆锥的面积,认识圆锥体a href=http://ruiwen.com/friend/list.php(教师中心专稿)/a...
- RealView MDK集成开发环境的使用
- 足浴报钟器哪个好 足浴按摩手法
- Pikachu(皮卡丘)靶场中SQL注入
- 人们为什么把电子计算机叫电脑,《计算机王国》.pdf