需求:

在实战(一)的基础 上,实现自定义分组机制。例如根据手机号的不同,分成不同的省份,然后在不同的reduce上面跑,最后生成的结果分别存在不同的文件中。

对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件。

思考:

需要自定义改造两个机制:
1、改造分区的逻辑,自定义一个partitioner,主要是实现如何进行分组。

Partitioner的作用是对Mapper产生的中间结果进行分片,以便将同一个分区的数据交给同一个Reducer处理,它直接影响Reducer阶段的负载均衡。
Partitioner只提供了一个方法:
getPartition(Text key,Text value,int numPartitions)
前两个参数是Map的Key和Value,numPartitions为Reduce的个数。

2、自定义reducer task的并发任务数,使得多个reduce同时工作。

项目目录如下:

AreaPartition.java:

package cn.darrenchan.hadoop.mr.areapartition;import java.util.HashMap;import org.apache.hadoop.mapreduce.Partitioner;public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{private static HashMap<String,Integer> areaMap = new HashMap<>();/*** 这里只是提前设定了一下,其实这里可以写查询数据库,返回号码所在省份的编号*/static{areaMap.put("135", 0);areaMap.put("136", 1);areaMap.put("137", 2);areaMap.put("138", 3);areaMap.put("139", 4);}@Overridepublic int getPartition(KEY key, VALUE value, int numPartitions) {//从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号int areaCoder  = areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3));return areaCoder;}}

FlowSumArea.java:

package cn.darrenchan.hadoop.mr.areapartition;import java.io.IOException;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import cn.darrenchan.hadoop.mr.flow.FlowBean;/*** 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件 * 需要自定义改造两个机制:* 1、改造分区的逻辑,自定义一个partitioner* 2、自定义reduer task的并发任务数* */
public class FlowSumArea {public static class FlowSumAreaMapper extendsMapper<LongWritable, Text, Text, FlowBean> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// 拿一行数据String line = value.toString();// 切分成各个字段String[] fields = StringUtils.split(line, "\t");// 拿到我们需要的字段String phoneNum = fields[1];long upFlow = Long.parseLong(fields[7]);long downFlow = Long.parseLong(fields[8]);// 封装数据为kv并输出context.write(new Text(phoneNum), new FlowBean(phoneNum, upFlow,downFlow));}}public static class FlowSumAreaReducer extendsReducer<Text, FlowBean, Text, FlowBean> {@Overrideprotected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException {long up_flow_counter = 0;long d_flow_counter = 0;for (FlowBean bean : values) {up_flow_counter += bean.getUpFlow();d_flow_counter += bean.getDownFlow();}context.write(key, new FlowBean(key.toString(), up_flow_counter,d_flow_counter));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(FlowSumArea.class);job.setMapperClass(FlowSumAreaMapper.class);job.setReducerClass(FlowSumAreaReducer.class);// 设置我们自定义的分组逻辑定义job.setPartitionerClass(AreaPartitioner.class);       job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 设置reduce的任务并发数,应该跟分组的数量保持一致,写1不会报错,2,3,4,5均会报错,7,8,9...反而不会报错,因为后面的直接数据为0了job.setNumReduceTasks(6);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}

FlowBeanArea.java:

package cn.darrenchan.hadoop.mr.flow;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;public class FlowBean implements WritableComparable<FlowBean> {private String phoneNum;// 手机号private long upFlow;// 上行流量private long downFlow;// 下行流量private long sumFlow;// 总流量public FlowBean() {super();}public FlowBean(String phoneNum, long upFlow, long downFlow) {super();this.phoneNum = phoneNum;this.upFlow = upFlow;this.downFlow = downFlow;this.sumFlow = upFlow + downFlow;}public String getPhoneNum() {return phoneNum;}public void setPhoneNum(String phoneNum) {this.phoneNum = phoneNum;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}// 从数据流中反序列出对象的数据// 从数据流中读出对象字段时,必须跟序列化时的顺序保持一致
    @Overridepublic void readFields(DataInput in) throws IOException {phoneNum = in.readUTF();upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}// 将对象数据序列化到流中
    @Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(phoneNum);out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}@Overridepublic int compareTo(FlowBean flowBean) {return sumFlow > flowBean.getSumFlow() ? -1 : 1;}}

将项目打包成area.jar,并执行命令:

hadoop jar area.jar cn.darrenchan.hadoop.mr.areapartition.FlowSumArea /flow/srcdata /flow/outputarea

我们可以看到如下运行信息:

17/02/26 09:10:54 INFO client.RMProxy: Connecting to ResourceManager at weekend110/192.168.230.134:8032
17/02/26 09:10:54 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/02/26 09:10:55 INFO input.FileInputFormat: Total input paths to process : 1
17/02/26 09:10:55 INFO mapreduce.JobSubmitter: number of splits:1
17/02/26 09:10:55 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1488112052214_0005
17/02/26 09:10:55 INFO impl.YarnClientImpl: Submitted application application_1488112052214_0005
17/02/26 09:10:55 INFO mapreduce.Job: The url to track the job: http://weekend110:8088/proxy/application_1488112052214_0005/
17/02/26 09:10:55 INFO mapreduce.Job: Running job: job_1488112052214_0005
17/02/26 09:11:01 INFO mapreduce.Job: Job job_1488112052214_0005 running in uber mode : false
17/02/26 09:11:01 INFO mapreduce.Job: map 0% reduce 0%
17/02/26 09:11:07 INFO mapreduce.Job: map 100% reduce 0%
17/02/26 09:11:19 INFO mapreduce.Job: map 100% reduce 17%
17/02/26 09:11:23 INFO mapreduce.Job: map 100% reduce 33%
17/02/26 09:11:26 INFO mapreduce.Job: map 100% reduce 50%
17/02/26 09:11:27 INFO mapreduce.Job: map 100% reduce 83%
17/02/26 09:11:28 INFO mapreduce.Job: map 100% reduce 100%
17/02/26 09:11:28 INFO mapreduce.Job: Job job_1488112052214_0005 completed successfully
17/02/26 09:11:28 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=1152
FILE: Number of bytes written=652142
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2338
HDFS: Number of bytes written=526
HDFS: Number of read operations=21
HDFS: Number of large read operations=0
HDFS: Number of write operations=12
Job Counters
Launched map tasks=1
Launched reduce tasks=6
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=2663
Total time spent by all reduces in occupied slots (ms)=83315
Total time spent by all map tasks (ms)=2663
Total time spent by all reduce tasks (ms)=83315
Total vcore-seconds taken by all map tasks=2663
Total vcore-seconds taken by all reduce tasks=83315
Total megabyte-seconds taken by all map tasks=2726912
Total megabyte-seconds taken by all reduce tasks=85314560
Map-Reduce Framework
Map input records=22
Map output records=22
Map output bytes=1072
Map output materialized bytes=1152
Input split bytes=124
Combine input records=0
Combine output records=0
Reduce input groups=21
Reduce shuffle bytes=1152
Reduce input records=22
Reduce output records=21
Spilled Records=44
Shuffled Maps =6
Failed Shuffles=0
Merged Map outputs=6
GC time elapsed (ms)=524
CPU time spent (ms)=3210
Physical memory (bytes) snapshot=509775872
Virtual memory (bytes) snapshot=2547916800
Total committed heap usage (bytes)=218697728
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2214
File Output Format Counters
Bytes Written=526

运行结果完成之后,我们发现这次生成了6个文件,显示如下:

最终显示结果如下所示,我们看到的确是按照我们预期的进行了相应的分组:

在运行过程中,我们不断监控该过程,看看是不是一共6个reduce同时工作,发现最多的地方确实是6个YarnChild,说明我们的程序正确。

Last login: Sun Feb 26 04:26:01 2017 from 192.168.230.1
[hadoop@weekend110 ~] jps
2473 NameNode
8703 RunJar
9214 Jps
9029 YarnChild
8995 YarnChild
2747 SecondaryNameNode
8978 -- process information unavailable
2891 ResourceManager
2992 NodeManager
8799 MRAppMaster
9053 YarnChild
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
2747 SecondaryNameNode
2891 ResourceManager
2992 NodeManager
8799 MRAppMaster
2569 DataNode
9330 Jps
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
9495 Jps
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
2891 ResourceManager
9386 RunJar
9558 Jps
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
9580 Jps
2747 SecondaryNameNode
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9598 YarnChild
9482 MRAppMaster
2747 SecondaryNameNode
9623 Jps
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9650 Jps
9482 MRAppMaster
2747 SecondaryNameNode
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
9665 YarnChild
2747 SecondaryNameNode
9681 YarnChild
9696 Jps
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
9704 YarnChild
[hadoop@weekend110 ~] jps
2473 NameNode
9772 Jps
9482 MRAppMaster
9665 YarnChild
2747 SecondaryNameNode
9681 YarnChild
9770 YarnChild
9751 YarnChild
2891 ResourceManager
9386 RunJar
2992 NodeManager
9730 YarnChild
2569 DataNode
9704 YarnChild
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
9817 Jps
9665 -- process information unavailable
2747 SecondaryNameNode
9681 YarnChild
9770 YarnChild
9751 YarnChild
2891 ResourceManager
9386 RunJar
2992 NodeManager
9730 YarnChild
2569 DataNode
9704 YarnChild
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
9681 YarnChild
9872 Jps
9770 YarnChild
9751 YarnChild
2891 ResourceManager
9386 RunJar
2992 NodeManager
9730 YarnChild
2569 DataNode
9704 YarnChild
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
9921 Jps
2747 SecondaryNameNode
9770 YarnChild
9751 YarnChild
2891 ResourceManager
9386 RunJar
2992 NodeManager
9730 YarnChild
2569 DataNode
9704 YarnChild
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
9770 YarnChild
9751 -- process information unavailable
2891 ResourceManager
9386 RunJar
10021 Jps
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
10079 Jps
2891 ResourceManager
9386 RunJar
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
10090 Jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
2891 ResourceManager
2992 NodeManager
2569 DataNode
[hadoop@weekend110 ~] jps
2473 NameNode
9482 MRAppMaster
2747 SecondaryNameNode
10099 Jps
2891 ResourceManager
2992 NodeManager
2569 DataNode

转载于:https://www.cnblogs.com/DarrenChan/p/6464259.html

MapReduce实战(三)分区的实现相关推荐

  1. 云计算Python自动化运维开发实战 三、python文件类型

    为什么80%的码农都做不了架构师?>>>    云计算Python自动化运维开发实战 三.python文件类型 导语: python常用的有3种文件类型 1. 源代码     py ...

  2. Hadoop之mapreduce 实例三

    Hadoop之mapreduce 实例三 转载于:https://www.cnblogs.com/chaoren399/archive/2013/01/04/2844503.html

  3. 【Qt】数据库实战(三)

    00. 目录 文章目录 00. 目录 01. 概述 02. 开发环境 03. 增删改查操作 04. 名字绑定和位置绑定 05. 程序示例 06. 批处理操作 07. 事务操作 08. 附录 01. 概 ...

  4. 《Hadoop MapReduce实战手册》一1.4 给WordCount MapReduce程序增加combiner步骤

    本节书摘来异步社区<Hadoop MapReduce实战手册>一书中的第1章,第1.4节,作者: [美]Srinath Perera , Thilina Gunarathne 译者: 杨卓 ...

  5. [python opencv 计算机视觉零基础到实战] 三、numpy与图像编辑

    一.学习目标 了解图片的通道与数组结构 了解使用numpy创建一个图片 了解使用numpy对图片的一般操作方法 目录 [python opencv 计算机视觉零基础到实战] 一.opencv的hell ...

  6. fat32转ntfs工具无损数据安全转换_干货真香! 无损制作UD三分区教程,新手小白的福利来了...

    [术语]UD三分区=UD区+ntfs的DATA区+fat的EFI区 无损制作--无损原有U盘数据来制作UD三分区 [适用]hdd格式的fat16.fat32.ntfs格式的U盘.读卡器和移动硬盘:不适 ...

  7. bigsur正式版clover引导_【微信首发】macOS Catalina 10.15.6 19G2021 正式版 Clover/OC/PE三分区原版镜像...

    [微信首发]macOS Catalina 10.15.6 19G2021 正式版 Clover 5120/OC/PE三分区支持Intel及AMD双平台原版镜像 8月13日,苹果向macOS推送10. ...

  8. 《Hadoop MapReduce实战手册》一1.10 使用MapReduce监控UI

    本节书摘来异步社区<Hadoop MapReduce实战手册>一书中的第1章,第1.10节,作者: [美]Srinath Perera , Thilina Gunarathne 译者: 杨 ...

  9. MapReduce分片、分区、分组 傻傻分不清

    MapReduce分片.分区.分组关系图 分片 对于HDFS中存储的一个文件,要进行Map处理前,需要将它切分成多个块,才能分配给不同的MapTask去执行.分片的数量等于启动的MapTask的数量. ...

  10. OpenCV C++案例实战三《二维码检测》

    OpenCV C++案例实战三<二维码检测> 前言 一.二维码检测 二.二维码识别 1.通过findContours找到轮廓层级关系 三.二维码绘制 四.源码 总结 前言 本文将使用Ope ...

最新文章

  1. java 编写小工具 尝试 学习(四)
  2. linux管道阻塞代码,linux中的管道
  3. java线程的简单例子(Thread and runnable)
  4. 万物皆可“小程序”——迟到的iOS 14之猜想
  5. VC 开机自动启动程序 方法
  6. 自己动手写CPU(4)移动操作指令的实现
  7. 万能的BERT连文本纠错也不放过
  8. rm: 无法删除swap: 不允许的操作_safe-rm老板再也不用担心我删库跑路啦[视频]
  9. 【华为云技术分享】Python大神编程常用4大工具,你用过几个?
  10. 如何在电脑网页下载准考证
  11. c语言入门篇:程序调试方法
  12. 2019软件测评师考试
  13. python添加环境变量代码_Maya中的PYTHONPATH 环境变量
  14. PayPal支付开发
  15. DiskGenius系统迁移(更换硬盘系统对拷)
  16. 信息学奥赛一本通T1183-病人排队-题解(C语言代码)
  17. 图形学--(中点画线法+Bresenham画线算法)
  18. 玩转OneNET物联网平台之简介
  19. 都有哪些影响棋牌游戏开发价位的因素
  20. 【ProVerif学习笔记】6:握手协议(handshake protocol)建模

热门文章

  1. mybatis 多租户saas_SaaS 微服务脚手架
  2. Android入门笔记11
  3. pytorch加载自己的数据集图片格式
  4. Python 读写matlab中.mat文件
  5. Caffe学习:Blobs, Layers, and Nets
  6. 2021-08-04 模糊查询
  7. 【Django 2021年最新版教程9】数据库查询操作
  8. Hyperledger Fabric教程(12)-- 交易过程
  9. 微信小程序云开发教程-微信小程序的API入门-常用API
  10. 远程 导数据 mysql_mysql远程导入