2019独角兽企业重金招聘Python工程师标准>>>

上一章我们有讲到一个mapreduce案例——移动流量排序,如果我们要将最后的输出结果按总流量大小逆序输出,该怎么实现呢?本节博主将分享这个实现的过程。

一、分析

首先,要实现这个功能,我们可能会想到是否有办法将输出的结果先缓存起来,等执行完成后,在排序一起次性全部输出。是的,这的确是一个可以实现的思路;

我们可以启动一个reduce来处理,在reduce阶段中reduce()方法每次执行时,将key和value缓存到一个TreeMap里面,并且不执行输出;当reduce全部切片处理完成后,会调用一个cleanup()方法,且这个方法仅会被调用一次,我们可以在这个方法里面做排序输出。

上面的这种方式确实是可以实现,当是并不是很优雅;我们可以利用mapreduce自身的map阶段输出key的特性来实现,这个特性就是所有的key会按照key类comparable方法实现的实现去做排序输出。详细过程,我们可以将整个需求分成两个mapreduce过程来执行,第一个mapreduce就和之前的博客中一样只做统计流量,第二个mapreduce我们就用key的特性去实现排序。

二、实现方案(key特性实现方式)

FlowBean(流量统计bean类)

package com.empire.hadoop.mr.flowsort;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/*** 类 FlowBean.java的实现描述:流量统计bean类* * @author arron 2018年12月1日 下午10:59:42*/
public class FlowBean implements WritableComparable<FlowBean> {private long upFlow;private long dFlow;private long sumFlow;//反序列化时,需要反射调用空参构造函数,所以要显示定义一个public FlowBean() {}public FlowBean(long upFlow, long dFlow) {this.upFlow = upFlow;this.dFlow = dFlow;this.sumFlow = upFlow + dFlow;}public void set(long upFlow, long dFlow) {this.upFlow = upFlow;this.dFlow = dFlow;this.sumFlow = upFlow + dFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getdFlow() {return dFlow;}public void setdFlow(long dFlow) {this.dFlow = dFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}/*** 序列化方法*/public void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(dFlow);out.writeLong(sumFlow);}/*** 反序列化方法 注意:反序列化的顺序跟序列化的顺序完全一致*/public void readFields(DataInput in) throws IOException {upFlow = in.readLong();dFlow = in.readLong();sumFlow = in.readLong();}public String toString() {return upFlow + "\t" + dFlow + "\t" + sumFlow;}public int compareTo(FlowBean o) {return this.sumFlow > o.getSumFlow() ? -1 : 1; //从大到小, 当前对象和要比较的对象比, 如果当前对象大, 返回-1, 交换他们的位置(自己的理解)}}

FlowCountSort(流量统计后的mapreduce排序实现主类)

package com.empire.hadoop.mr.flowsort;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** 13480253104 180 180 360 13502468823 7335 110349 117684 13560436666 1116 954* 2070 类 FlowCountSort.java的实现描述:流量排序的mapreduce主实现类* * @author arron 2018年12月1日 下午11:00:07*/
public class FlowCountSort {static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {FlowBean bean = new FlowBean();Text     v    = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 拿到的是上一个统计程序的输出结果,已经是各手机号的总流量信息String line = value.toString();String[] fields = line.split("\t");String phoneNbr = fields[0];long upFlow = Long.parseLong(fields[1]);long dFlow = Long.parseLong(fields[2]);bean.set(upFlow, dFlow);v.set(phoneNbr);context.write(bean, v);}}/*** 根据key来掉, 传过来的是对象, 每个对象都是不一样的, 所以每个对象都调用一次reduce方法* * @author: 张政* @date: 2016年4月11日 下午7:08:18* @package_name: day07.sample*/static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {// <bean(),phonenbr>@Overrideprotected void reduce(FlowBean bean, Iterable<Text> values, Context context)throws IOException, InterruptedException {context.write(values.iterator().next(), bean);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();/** conf.set("mapreduce.framework.name", "yarn");* conf.set("yarn.resoucemanager.hostname", "mini1");*/Job job = Job.getInstance(conf);/* job.setJar("/home/hadoop/wc.jar"); *///指定本程序的jar包所在的本地路径job.setJarByClass(FlowCountSort.class);//指定本业务job要使用的mapper/Reducer业务类job.setMapperClass(FlowCountSortMapper.class);job.setReducerClass(FlowCountSortReducer.class);//指定mapper输出数据的kv类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);//指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));//指定job的输出结果所在目录Path outPath = new Path(args[1]);/** FileSystem fs = FileSystem.get(conf); if(fs.exists(outPath)){* fs.delete(outPath, true); }*/FileOutputFormat.setOutputPath(job, outPath);//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行/* job.submit(); */boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}}

三、打包运行

#提交hadoop集群运行
hadoop jar flowsort_aaron.jar com.empire.hadoop.mr.flowsort.FlowCountSort /user/hadoop/flowcountount    /flowsort
#查看输出结果目录
hdfs dfs -ls /flowsort
#浏览输出结果
hdfs dfs -cat /flowsort/part-r-00000

运行效果图:

[hadoop@centos-aaron-h1 ~]$ hadoop jar flowsort_aaron.jar com.empire.hadoop.mr.flowsort.FlowCountSort /user/hadoop/flowcountount    /flowsort
18/12/02 07:10:46 INFO client.RMProxy: Connecting to ResourceManager at centos-aaron-h1/192.168.29.144:8032
18/12/02 07:10:46 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/12/02 07:10:48 INFO input.FileInputFormat: Total input files to process : 1
18/12/02 07:10:48 INFO mapreduce.JobSubmitter: number of splits:1
18/12/02 07:10:48 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
18/12/02 07:10:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1543705650872_0001
18/12/02 07:10:50 INFO impl.YarnClientImpl: Submitted application application_1543705650872_0001
18/12/02 07:10:50 INFO mapreduce.Job: The url to track the job: http://centos-aaron-h1:8088/proxy/application_1543705650872_0001/
18/12/02 07:10:50 INFO mapreduce.Job: Running job: job_1543705650872_0001
18/12/02 07:11:00 INFO mapreduce.Job: Job job_1543705650872_0001 running in uber mode : false
18/12/02 07:11:00 INFO mapreduce.Job:  map 0% reduce 0%
18/12/02 07:11:10 INFO mapreduce.Job:  map 100% reduce 0%
18/12/02 07:11:23 INFO mapreduce.Job:  map 100% reduce 100%
18/12/02 07:11:23 INFO mapreduce.Job: Job job_1543705650872_0001 completed successfully
18/12/02 07:11:23 INFO mapreduce.Job: Counters: 49File System CountersFILE: Number of bytes read=801FILE: Number of bytes written=396695FILE: Number of read operations=0FILE: Number of large read operations=0FILE: Number of write operations=0HDFS: Number of bytes read=725HDFS: Number of bytes written=594HDFS: Number of read operations=6HDFS: Number of large read operations=0HDFS: Number of write operations=2Job Counters Launched map tasks=1Launched reduce tasks=1Data-local map tasks=1Total time spent by all maps in occupied slots (ms)=6980Total time spent by all reduces in occupied slots (ms)=8661Total time spent by all map tasks (ms)=6980Total time spent by all reduce tasks (ms)=8661Total vcore-milliseconds taken by all map tasks=6980Total vcore-milliseconds taken by all reduce tasks=8661Total megabyte-milliseconds taken by all map tasks=7147520Total megabyte-milliseconds taken by all reduce tasks=8868864Map-Reduce FrameworkMap input records=21Map output records=21Map output bytes=753Map output materialized bytes=801Input split bytes=131Combine input records=0Combine output records=0Reduce input groups=21Reduce shuffle bytes=801Reduce input records=21Reduce output records=21Spilled Records=42Shuffled Maps =1Failed Shuffles=0Merged Map outputs=1GC time elapsed (ms)=402CPU time spent (ms)=1890Physical memory (bytes) snapshot=342441984Virtual memory (bytes) snapshot=1694273536Total committed heap usage (bytes)=137867264Shuffle ErrorsBAD_ID=0CONNECTION=0IO_ERROR=0WRONG_LENGTH=0WRONG_MAP=0WRONG_REDUCE=0File Input Format Counters Bytes Read=594File Output Format Counters Bytes Written=594

运行结果:

[hadoop@centos-aaron-h1 ~]$ hdfs dfs -ls /flowsort
Found 2 items
-rw-r--r--   2 hadoop supergroup          0 2018-12-02 07:11 /flowsort/_SUCCESS
-rw-r--r--   2 hadoop supergroup        594 2018-12-02 07:11 /flowsort/part-r-00000
[hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /flowsort/part-r-00000
13502468823     36675   551745  588420
13925057413     55290   241215  296505
13726238888     12405   123405  135810
13726230503     12405   123405  135810
18320173382     47655   12060   59715
13560439658     10170   29460   39630
13660577991     34800   3450    38250
15013685858     18295   17690   35985
13922314466     15040   18600   33640
15920133257     15780   14680   30460
84138413        20580   7160    27740
13602846565     9690    14550   24240
18211575961     7635    10530   18165
15989002119     9690    900     10590
13560436666     5580    4770    10350
13926435656     660     7560    8220
13480253104     900     900     1800
13826544101     1320    0       1320
13926251106     1200    0       1200
13760778710     600     600     1200
13719199419     1200    0       1200

四、最后总结

细心的小伙伴们从上的mapreduce主代码中肯定会看出和之前的写法有所差别,如下图所示:

此处我们之前都是在map方法里面去申明对象,那么之前的做法有什么问题呢?那就是之前的代码如果在数据很多的时候,我们在调用map的时候回创建很多个对象,有可能会导致我们内存溢出。但是,如果们向上面这样写,就只创建一个对象就够了,在map中设置相应的值,而后序列换输出去,然后依次重复前面的设置动作即可。注意,此处是因为我们mapreduce会做序列化输出,所以同一个对象序列化后只需的结果,并不影响。

最后寄语,以上是博主本次文章的全部内容,如果大家觉得博主的文章还不错,请点赞;如果您对博主其它服务器大数据技术或者博主本人感兴趣,请关注博主博客,并且欢迎随时跟博主沟通交流。

转载于:https://my.oschina.net/u/2371923/blog/2966201

大数据教程(9.1)流量汇总排序的mr实现相关推荐

  1. 大数据面试题及答案 汇总版

    版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/albg_boy/article/det ...

  2. 好程序员大数据教程:SparkShell和IDEA中编写Spark程序

    好程序员大数据教程:SparkShell和IDEA中编写Spark程序,spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用Scala编写Spa ...

  3. 大数据常见运维问题汇总

    大数据常见运维问题汇总 其他安装问题: 1.IDEA安装与配置 IntelliJ IDEA使用教程 (总目录篇)_请叫我大师兄-CSDN博客_intellijidea使用教程 2.IntelliJ I ...

  4. 好程序员大数据教程分享之Hadoop优缺点

    好程序员大数据教程分享之Hadoop优缺点,大数据成为时代主流,开启时代的大门,全球43亿部电话.20亿位互联网用户每秒都在不断地产生大量数据,人们发送短信给朋友.上传视频.用手机拍照.更新社交网站的 ...

  5. mysql 序列自增长 恢复到1_大数据教程分享MySQL数据库约束条件和自增长序列

    大数据教程分享MySQL数据库约束条件和自增长序列,一.约束(constraint) 约束就是在表上强制执行的一种校验规则,当执行DML操作时,数据必须符合这些规则,如果不符合,将无法执行. 约束的全 ...

  6. 【学习笔记】尚硅谷Hadoop大数据教程笔记

    本文是尚硅谷Hadoop教程的学习笔记,由于个人的需要,只致力于搞清楚Hadoop是什么,它可以解决什么问题,以及它的原理是什么.至于具体怎么安装.使用和编写代码不在我考虑的范围内. 一.Hadoop ...

  7. 大数据实战:用户流量分析系统

    ---------------------------------------------------------------------------------------------------- ...

  8. 【干货】全球大数据领域顶级开源工具汇总

    大数据技术从业人员必读 一.Hadoop相关工具 1. Hadoop Apache的Hadoop项目已几乎与大数据划上了等号.它不断壮大起来,已成为一个完整的生态系统,众多开源工具面向高度扩展的分布式 ...

  9. 大数据综合项目--网站流量日志数据分析系统(详细步骤和代码)

    文章目录 前言: 基本概述 Sqoop概述 什么是Sqoop Flume概述 什么是Flume 为什么需要flume HIve概述 什么是Hive 系统背景: 模块开发 数据采集 使用Flume搭建日 ...

最新文章

  1. JDBC executeBatch 抛出异常停止
  2. python好学吗 老程序员-今天面试了一个34岁大龄程序员,有感而发
  3. 9 个可以快速掌握的 Java 性能调优技巧
  4. JavaWeb-综合案例(用户信息)-学习笔记06【复杂条件查询功能】
  5. 自适应页面 移动端获取焦点自动放大_专业 | 惠检LIMS系统:手机移动应用(MA)...
  6. SpringBoot约定大于配置的特性解读 SpringBoot快速入门
  7. [转]google protobuf安装与使用
  8. centos6.5 install cobbler
  9. 蓝桥杯2017年第八届C/C++省赛C组第二题-兴趣小组
  10. python将数据保存为pdf
  11. win10备份为wim_在PE中使用CGI进行系统备份和还原
  12. ZUCC计算机网络 网络安全
  13. Keras:我的第一个LSTM二分类网络模型
  14. 雷达为什么要进行脉冲压缩
  15. 服务端使用GZIP压缩数据
  16. 如何与低智商的人相处?
  17. 2021最新微信影视小程序源码无限代开+搭建详细教程
  18. 关于Dell r720重装系统时遇到的识别不到磁盘的问题
  19. 指数增强策略(股票)
  20. c语言代码大全表解释_C语言常用错误代码释义大全,值得收藏!

热门文章

  1. .net采集网页方法大全(5种)
  2. Object-C---gt;Swift之(八)类和结构体
  3. 调优之系统篇--cpu,内存
  4. html5关于定位功能的实现
  5. GNU make manual 翻译( 一百二十一)
  6. 一个页面标题和过滤输出的解决方案(下)
  7. .NET Core实战项目之CMS 第十二章 开发篇-Dapper封装CURD及仓储代码生成器实现...
  8. json.net使用说明一
  9. PLUICameraViewController 拍照页面崩溃
  10. 报告 | 2018中国区块链行业分析报告