MapReduce实战(三)分区的实现
需求:
在实战(一)的基础 上,实现自定义分组机制。例如根据手机号的不同,分成不同的省份,然后在不同的reduce上面跑,最后生成的结果分别存在不同的文件中。
对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件。
思考:
需要自定义改造两个机制:
1、改造分区的逻辑,自定义一个partitioner,主要是实现如何进行分组。
2、自定义reducer task的并发任务数,使得多个reduce同时工作。
项目目录如下:
AreaPartition.java:
package cn.darrenchan.hadoop.mr.areapartition;import java.util.HashMap;import org.apache.hadoop.mapreduce.Partitioner;public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{private static HashMap<String,Integer> areaMap = new HashMap<>();/*** 这里只是提前设定了一下,其实这里可以写查询数据库,返回号码所在省份的编号*/static{areaMap.put("135", 0);areaMap.put("136", 1);areaMap.put("137", 2);areaMap.put("138", 3);areaMap.put("139", 4);}@Overridepublic int getPartition(KEY key, VALUE value, int numPartitions) {//从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号int areaCoder = areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3));return areaCoder;}}
FlowSumArea.java:
package cn.darrenchan.hadoop.mr.areapartition;import java.io.IOException;import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import cn.darrenchan.hadoop.mr.flow.FlowBean;/*** 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件 * 需要自定义改造两个机制:* 1、改造分区的逻辑,自定义一个partitioner* 2、自定义reduer task的并发任务数* */ public class FlowSumArea {public static class FlowSumAreaMapper extendsMapper<LongWritable, Text, Text, FlowBean> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// 拿一行数据String line = value.toString();// 切分成各个字段String[] fields = StringUtils.split(line, "\t");// 拿到我们需要的字段String phoneNum = fields[1];long upFlow = Long.parseLong(fields[7]);long downFlow = Long.parseLong(fields[8]);// 封装数据为kv并输出context.write(new Text(phoneNum), new FlowBean(phoneNum, upFlow,downFlow));}}public static class FlowSumAreaReducer extendsReducer<Text, FlowBean, Text, FlowBean> {@Overrideprotected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException {long up_flow_counter = 0;long d_flow_counter = 0;for (FlowBean bean : values) {up_flow_counter += bean.getUpFlow();d_flow_counter += bean.getDownFlow();}context.write(key, new FlowBean(key.toString(), up_flow_counter,d_flow_counter));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(FlowSumArea.class);job.setMapperClass(FlowSumAreaMapper.class);job.setReducerClass(FlowSumAreaReducer.class);// 设置我们自定义的分组逻辑定义job.setPartitionerClass(AreaPartitioner.class); job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 设置reduce的任务并发数,应该跟分组的数量保持一致,写1不会报错,2,3,4,5均会报错,7,8,9...反而不会报错,因为后面的直接数据为0了job.setNumReduceTasks(6);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}
FlowBeanArea.java:
package cn.darrenchan.hadoop.mr.flow;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable;public class FlowBean implements WritableComparable<FlowBean> {private String phoneNum;// 手机号private long upFlow;// 上行流量private long downFlow;// 下行流量private long sumFlow;// 总流量public FlowBean() {super();}public FlowBean(String phoneNum, long upFlow, long downFlow) {super();this.phoneNum = phoneNum;this.upFlow = upFlow;this.downFlow = downFlow;this.sumFlow = upFlow + downFlow;}public String getPhoneNum() {return phoneNum;}public void setPhoneNum(String phoneNum) {this.phoneNum = phoneNum;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}// 从数据流中反序列出对象的数据// 从数据流中读出对象字段时,必须跟序列化时的顺序保持一致 @Overridepublic void readFields(DataInput in) throws IOException {phoneNum = in.readUTF();upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}// 将对象数据序列化到流中 @Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(phoneNum);out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}@Overridepublic int compareTo(FlowBean flowBean) {return sumFlow > flowBean.getSumFlow() ? -1 : 1;}}
将项目打包成area.jar,并执行命令:
hadoop jar area.jar cn.darrenchan.hadoop.mr.areapartition.FlowSumArea /flow/srcdata /flow/outputarea
我们可以看到如下运行信息:
17/02/26 09:10:54 INFO client.RMProxy: Connecting to ResourceManager at weekend110/192.168.230.134:8032
17/02/26 09:10:54 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/02/26 09:10:55 INFO input.FileInputFormat: Total input paths to process : 1
17/02/26 09:10:55 INFO mapreduce.JobSubmitter: number of splits:1
17/02/26 09:10:55 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1488112052214_0005
17/02/26 09:10:55 INFO impl.YarnClientImpl: Submitted application application_1488112052214_0005
17/02/26 09:10:55 INFO mapreduce.Job: The url to track the job: http://weekend110:8088/proxy/application_1488112052214_0005/
17/02/26 09:10:55 INFO mapreduce.Job: Running job: job_1488112052214_0005
17/02/26 09:11:01 INFO mapreduce.Job: Job job_1488112052214_0005 running in uber mode : false
17/02/26 09:11:01 INFO mapreduce.Job: map 0% reduce 0%
17/02/26 09:11:07 INFO mapreduce.Job: map 100% reduce 0%
17/02/26 09:11:19 INFO mapreduce.Job: map 100% reduce 17%
17/02/26 09:11:23 INFO mapreduce.Job: map 100% reduce 33%
17/02/26 09:11:26 INFO mapreduce.Job: map 100% reduce 50%
17/02/26 09:11:27 INFO mapreduce.Job: map 100% reduce 83%
17/02/26 09:11:28 INFO mapreduce.Job: map 100% reduce 100%
17/02/26 09:11:28 INFO mapreduce.Job: Job job_1488112052214_0005 completed successfully
17/02/26 09:11:28 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=1152
FILE: Number of bytes written=652142
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2338
HDFS: Number of bytes written=526
HDFS: Number of read operations=21
HDFS: Number of large read operations=0
HDFS: Number of write operations=12
Job Counters
Launched map tasks=1
Launched reduce tasks=6
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=2663
Total time spent by all reduces in occupied slots (ms)=83315
Total time spent by all map tasks (ms)=2663
Total time spent by all reduce tasks (ms)=83315
Total vcore-seconds taken by all map tasks=2663
Total vcore-seconds taken by all reduce tasks=83315
Total megabyte-seconds taken by all map tasks=2726912
Total megabyte-seconds taken by all reduce tasks=85314560
Map-Reduce Framework
Map input records=22
Map output records=22
Map output bytes=1072
Map output materialized bytes=1152
Input split bytes=124
Combine input records=0
Combine output records=0
Reduce input groups=21
Reduce shuffle bytes=1152
Reduce input records=22
Reduce output records=21
Spilled Records=44
Shuffled Maps =6
Failed Shuffles=0
Merged Map outputs=6
GC time elapsed (ms)=524
CPU time spent (ms)=3210
Physical memory (bytes) snapshot=509775872
Virtual memory (bytes) snapshot=2547916800
Total committed heap usage (bytes)=218697728
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2214
File Output Format Counters
Bytes Written=526
运行结果完成之后,我们发现这次生成了6个文件,显示如下:
最终显示结果如下所示,我们看到的确是按照我们预期的进行了相应的分组:
在运行过程中,我们不断监控该过程,看看是不是一共6个reduce同时工作,发现最多的地方确实是6个YarnChild,说明我们的程序正确。
Last login: Sun Feb 26 04:26:01 2017 from 192.168.230.1
[hadoop@weekend110 ~] jps
2473 NameNode
8703 RunJar
9214 Jps
9029 YarnChild
8995 YarnChild
2747 SecondaryNameNode
8978 -- process information unavailable
2891 ResourceManager
2992 NodeManager
8799 MRAppMaster
9053 YarnChild
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
2747 SecondaryNameNode
2891 ResourceManager
2992 NodeManager
8799 MRAppMaster
2569 DataNode
9330 Jps
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
9495 Jps
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
2891 ResourceManager
9386 RunJar
9558 Jps
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
9580 Jps
2747 SecondaryNameNode
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9598 YarnChild
9482 MRAppMaster
2747 SecondaryNameNode
9623 Jps
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9650 Jps
9482 MRAppMaster
2747 SecondaryNameNode
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
9665 YarnChild
2747 SecondaryNameNode
9681 YarnChild
9696 Jps
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
9704 YarnChild
[hadoop@weekend110 ~] jps
2473 NameNode
9772 Jps
9482 MRAppMaster
9665 YarnChild
2747 SecondaryNameNode
9681 YarnChild
9770 YarnChild
9751 YarnChild
2891 ResourceManager
9386 RunJar
2992 NodeManager
9730 YarnChild
2569 DataNode
9704 YarnChild
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
9817 Jps
9665 -- process information unavailable
2747 SecondaryNameNode
9681 YarnChild
9770 YarnChild
9751 YarnChild
2891 ResourceManager
9386 RunJar
2992 NodeManager
9730 YarnChild
2569 DataNode
9704 YarnChild
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
9681 YarnChild
9872 Jps
9770 YarnChild
9751 YarnChild
2891 ResourceManager
9386 RunJar
2992 NodeManager
9730 YarnChild
2569 DataNode
9704 YarnChild
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
9921 Jps
2747 SecondaryNameNode
9770 YarnChild
9751 YarnChild
2891 ResourceManager
9386 RunJar
2992 NodeManager
9730 YarnChild
2569 DataNode
9704 YarnChild
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
9770 YarnChild
9751 -- process information unavailable
2891 ResourceManager
9386 RunJar
10021 Jps
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
10079 Jps
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
10090 Jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
2891 ResourceManager
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
10099 Jps
2891 ResourceManager
2992 NodeManager
2569 DataNode
转载于:https://www.cnblogs.com/DarrenChan/p/6464259.html
MapReduce实战(三)分区的实现相关推荐
- 云计算Python自动化运维开发实战 三、python文件类型
为什么80%的码农都做不了架构师?>>> 云计算Python自动化运维开发实战 三.python文件类型 导语: python常用的有3种文件类型 1. 源代码 py ...
- Hadoop之mapreduce 实例三
Hadoop之mapreduce 实例三 转载于:https://www.cnblogs.com/chaoren399/archive/2013/01/04/2844503.html
- 【Qt】数据库实战(三)
00. 目录 文章目录 00. 目录 01. 概述 02. 开发环境 03. 增删改查操作 04. 名字绑定和位置绑定 05. 程序示例 06. 批处理操作 07. 事务操作 08. 附录 01. 概 ...
- 《Hadoop MapReduce实战手册》一1.4 给WordCount MapReduce程序增加combiner步骤
本节书摘来异步社区<Hadoop MapReduce实战手册>一书中的第1章,第1.4节,作者: [美]Srinath Perera , Thilina Gunarathne 译者: 杨卓 ...
- [python opencv 计算机视觉零基础到实战] 三、numpy与图像编辑
一.学习目标 了解图片的通道与数组结构 了解使用numpy创建一个图片 了解使用numpy对图片的一般操作方法 目录 [python opencv 计算机视觉零基础到实战] 一.opencv的hell ...
- fat32转ntfs工具无损数据安全转换_干货真香! 无损制作UD三分区教程,新手小白的福利来了...
[术语]UD三分区=UD区+ntfs的DATA区+fat的EFI区 无损制作--无损原有U盘数据来制作UD三分区 [适用]hdd格式的fat16.fat32.ntfs格式的U盘.读卡器和移动硬盘:不适 ...
- bigsur正式版clover引导_【微信首发】macOS Catalina 10.15.6 19G2021 正式版 Clover/OC/PE三分区原版镜像...
[微信首发]macOS Catalina 10.15.6 19G2021 正式版 Clover 5120/OC/PE三分区支持Intel及AMD双平台原版镜像 8月13日,苹果向macOS推送10. ...
- 《Hadoop MapReduce实战手册》一1.10 使用MapReduce监控UI
本节书摘来异步社区<Hadoop MapReduce实战手册>一书中的第1章,第1.10节,作者: [美]Srinath Perera , Thilina Gunarathne 译者: 杨 ...
- MapReduce分片、分区、分组 傻傻分不清
MapReduce分片.分区.分组关系图 分片 对于HDFS中存储的一个文件,要进行Map处理前,需要将它切分成多个块,才能分配给不同的MapTask去执行.分片的数量等于启动的MapTask的数量. ...
- OpenCV C++案例实战三《二维码检测》
OpenCV C++案例实战三<二维码检测> 前言 一.二维码检测 二.二维码识别 1.通过findContours找到轮廓层级关系 三.二维码绘制 四.源码 总结 前言 本文将使用Ope ...
最新文章
- java 编写小工具 尝试 学习(四)
- linux管道阻塞代码,linux中的管道
- java线程的简单例子(Thread and runnable)
- 万物皆可“小程序”——迟到的iOS 14之猜想
- VC 开机自动启动程序 方法
- 自己动手写CPU(4)移动操作指令的实现
- 万能的BERT连文本纠错也不放过
- rm: 无法删除swap: 不允许的操作_safe-rm老板再也不用担心我删库跑路啦[视频]
- 【华为云技术分享】Python大神编程常用4大工具,你用过几个?
- 如何在电脑网页下载准考证
- c语言入门篇:程序调试方法
- 2019软件测评师考试
- python添加环境变量代码_Maya中的PYTHONPATH 环境变量
- PayPal支付开发
- DiskGenius系统迁移(更换硬盘系统对拷)
- 信息学奥赛一本通T1183-病人排队-题解(C语言代码)
- 图形学--(中点画线法+Bresenham画线算法)
- 玩转OneNET物联网平台之简介
- 都有哪些影响棋牌游戏开发价位的因素
- 【ProVerif学习笔记】6:握手协议(handshake protocol)建模