[toc]


MapReduce程序之数据排序

需求

下面有三个文件:

yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/sort$ cat file1.csv
2
32
654
32
15
756
65223
yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/sort$ cat file2.csv
5956
22
650
92
yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/sort$ cat file3.csv
26
54
6

使用MapReduce对其进行排序并输出。

分析思路

Map阶段分析:
/*** 数据在Map之后会做sort(从内存缓冲区到磁盘的时候会做sort),所以Map操作只需要把数据直接写出即可,最后在本地做数据* 合并的时候也是会有排序的,详细可以参考MapReduce的过程,但是需要注意的是,因为我们需要进行的是数字的排序,* 所以在Map输出时,key的类型应该是Int类型才能按照数字的方式进行排序,如果是Text文本的话,那么是按照字典顺序* 来进行排序的(也就是先比较字符串中的第一个字符,如果相同再比较第二个字符,以此类推),而不是按照数字进行排序*/Reduce阶段分析:
/*** 需要注意的是,排序与其它MapReduce程序有所不同,最后在驱动程序设置ReduceTask时,必须要设置为1* 这样才能把数据都汇总到一起,另外一点,数据在shuffle到达reducer的时候,从内存缓冲区写到磁盘时* 也会进行排序操作,所以即便是从不同节点上的Map上shuffle来的数据,到输入到reducer时,数据也是有序的,* 所以Reducer需要做的是把数据直接写到context中就可以了*/ 

MapReduce程序

关于如何进行数据的排序,思路已经在代码注释中有说明,不过需要注意的是,这里使用了前面开发的Job工具类来开发驱动程序,程序代码如下:

package com.uplooking.bigdata.mr.sort;import com.uplooking.bigdata.common.utils.MapReduceJobUtil;
import com.uplooking.bigdata.mr.duplication.DuplicationJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;public class SortJob {/*** 驱动程序,使用工具类来生成job** @param args*/public static void main(String[] args) throws Exception {if (args == null || args.length < 2) {System.err.println("Parameter Errors! Usages:<inputpath> <outputpath>");System.exit(-1);}Job job = MapReduceJobUtil.buildJob(new Configuration(),SortJob.class,args[0],TextInputFormat.class,SortMapper.class,IntWritable.class,NullWritable.class,new Path(args[1]),TextOutputFormat.class,SortReducer.class,IntWritable.class,NullWritable.class);// ReduceTask必须设置为1job.setNumReduceTasks(1);job.waitForCompletion(true);}/*** 数据在Map之后会做sort(从内存缓冲区到磁盘的时候会做sort),所以Map操作只需要把数据直接写出即可,最后在本地做数据* 合并的时候也是会有排序的,详细可以参考MapReduce的过程,但是需要注意的是,因为我们需要进行的是数字的排序,* 所以在Map输出时,key的类型应该是Int类型才能按照数字的方式进行排序,如果是Text文本的话,那么是按照字典顺序* 来进行排序的(也就是先比较字符串中的第一个字符,如果相同再比较第二个字符,以此类推),而不是按照数字进行排序*/public static class SortMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// 先将value转换为数字int num = Integer.valueOf(value.toString());// 直接写出数据到context中context.write(new IntWritable(num), NullWritable.get());}}/*** 需要注意的是,排序与其它MapReduce程序有所不同,最后在驱动程序设置ReduceTask时,必须要设置为1* 这样才能把数据都汇总到一起,另外一点,数据在shuffle到达reducer的时候,从内存缓冲区写到磁盘时* 也会进行排序操作,所以即便是从不同节点上的Map上shuffle来的数据,到输入到reducer时,数据也是有序的,* 所以Reducer需要做的是把数据直接写到context中就可以了*/public static class SortReducer extends Reducer<IntWritable, NullWritable, IntWritable, NullWritable> {@Overrideprotected void reduce(IntWritable key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {// 直接将数据写入到context中context.write(key, NullWritable.get());}}/*** 仍然需要说明的是,因为reduce端在shuffle数据写入到磁盘的时候已经完成了排序,* 而这个排序的操作不是在reducer的输出中完成的,这也就意味着,reducer的输出数据中的key数据类型,* 可以是IntWritable,显然也可以设置为Text的,说明这个问题主要是要理清map-shuffle-reduce的过程*/
}

测试

这里使用本地环境来运行MapReduce程序,输入的参数如下:

/Users/yeyonghao/data/input/sort /Users/yeyonghao/data/output/mr/sort

也可以将其打包成jar包,然后上传到Hadoop环境中运行。

运行程序后,查看输出结果如下:

yeyonghao@yeyonghaodeMacBook-Pro:~/data/output/mr/sort$ cat part-r-00000
2
6
15
22
26
32
54
92
650
654
756
5956
65223

可以看到,我们的MapReduce已经完成了数据排序的操作。

注意事项

因为在map输出后,相同的key会被shuffle到同一个reducer中,所以这个过程其实也完成了去重的操作,这也就意味着,按照上面的MapReduce程序的思路,重复的数据也会被删除,那么如何解决这个问题呢?大家可以思考一下。

思路也比较简单,可以这样做,map输出的时候,key还是原来的key,而value不再是NullWritalbe,而是跟key一样的,这样到了reducer的时候,如果有相同的数据,输入的数据就类似于<32, [32, 32, 32]>,那么在reducer输出数据的时候,就可以迭代[32, 32, 32]数据进行输出,这样就可以避免shuffle阶段key去重所带来去除了相同数字的问题。

注意,前面一些文章,很多操作多次提到是在shuffle,其实有些是不准备的,包括这里的。在Map端,其实key的去重是在merge on disk的过程中完成了。

MapReduce程序之数据排序相关推荐

  1. 大数据日志分析项目mapreduce程序

    总体思路: 使用flume将服务器上的日志传到hadoop上面,然后使用mapreduce程序完成数据清洗,统计pv,visit模型.最后使用azkaban定时执行程序. 用户每次登录根据sessio ...

  2. Hadoop集群大数据解决方案之MapReduce 程序实战进阶(自定义partitionsortgroup)(六)

    准 备   在上一篇博客举了个简单的word count,重在说明mapreduce的流程,但是针对mapreduce的编程,程序员能控制的,远远不止map和reduce,还有诸如partition, ...

  3. 使用MapReduce程序完成相关数据预处理(二)

    使用MapReduce程序完成相关数据预处理(二) 数据大概有2万条左右所以部分截取 (格式为csv) 1月20日,北京,大兴区,2,0,0,北京市大兴区卫健委,https://m.weibo.cn/ ...

  4. ## 使用MapReduce程序完成相关数据预处理

    使用MapReduce程序完成相关数据预处理 数据大概有2万条左右所以部分截取 (格式为csv) 1月20日,北京,大兴区,2,0,0,北京市大兴区卫健委,https://m.weibo.cn/270 ...

  5. hadoop大数据——mapreduce程序提交运行模式及debug方法

    本地运行模式 (1)mapreduce程序是被提交给LocalJobRunner在本地运行 (2)而处理的数据及输出结果可以在本地文件系统,也可以在hdfs上 怎样实现本地运行?:写一个程序,不要带集 ...

  6. Hadoop之MapReduce程序应用一读取专利引用数据集并对它进行倒排

    摘要:MapReduce程序处理专利数据集. 关键词:MapReduce程序   专利数据集 数据源:专利引用数据集cite75_99.txt.(该数据集可以从网址http://www.nber.or ...

  7. [python]使用python实现Hadoop MapReduce程序:计算一组数据的均值和方差

    这是参照<机器学习实战>中第15章"大数据与MapReduce"的内容,因为作者写作时hadoop版本和现在的版本相差很大,所以在Hadoop上运行python写的Ma ...

  8. 微信小程序数据排序和倒序reverse()

    var a = [3,7,2]; 微信小程序数据排序,先将数据按从小到大排序 //定义compare函数,参数name是对象的某一个属性,比如age.salary //返回一个可以用来对包含该成员的对 ...

  9. c语言双重for循环流程图_使用C语言编写程序对数据进行排序

    使用C语言编写程序对数据进行排序 C语言是一种强大的编程软件,使用十分广泛,用户众多,也是学习其他语言的基础.我作为一个C语言忠实粉丝,以一些常见的C程序来剖析C语言的格式和结构,希望能够为广大初学者 ...

最新文章

  1. iOS iTunes Connect协议更新导致无法构建新版本
  2. IE6使用png透明图片的方法
  3. python读文件的三个方法read()、readline()、readlines()详解
  4. Docker Compose安装Registry后配置WebUI与客户端
  5. boost::container模块实现普通容器的程序
  6. Docker私有仓库管理,删除本地仓库中的镜像
  7. 基于事件驱动架构构建微服务第7部分:在仓储上实现事件溯源
  8. 公众号发布代码最好的工具markdown语法
  9. kafka0.9 java commit_Kafka 0.9 新消费者API
  10. C++ 拷贝构造函数
  11. CVE-2020-16875: Microsoft Exchange远程代码执行漏洞通告
  12. 非模态对话框的创建于销毁
  13. python下载速度显示_Python获取下载速度并显示进度条
  14. CAM350 V14.5安装记录
  15. 机器学习文本特征提取
  16. 对比分析折半查找与Fibonacci查找算法
  17. Blazor发布问题,localhost可以访问,局域网无法访问
  18. angular : 自定义组件双向绑定 [(ngModel)]
  19. python 头条 上传_基于Python的免费新闻头条接口查询
  20. 小白月赛26:E牛牛走迷宫(BFS)

热门文章

  1. Iterator迭代器接口讲解
  2. 斗地主综合案例之有序版本
  3. No database support: No database YAML file
  4. Postfix实现代理Exchange邮件传输方案
  5. 常见的爬虫分析库(1)-Python3中Urllib库基本使用
  6. 【代码笔记】Web-HTML-列表
  7. ZooKeeper之(四)配置与命令
  8. CAD输出的局部平面坐标数据配准转换到WGS84坐标系
  9. linux文件夹重命名busy,Linux下执行程序出现 Text file busy 提示时的解决方法
  10. 蚂蚁资深算法专家周俊:从原理到落地,支付宝如何打造保护隐私的共享智能?...