Mapreduce的分区—Partitioner
1. 需求
将流量汇总统计结果按照手机归属地不同省份输出到不同文件中。
2. 分析
Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask。
默认的分发规则为:根据key的hashcode%reducetask数来分发
所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner,自定义一个CustomPartitioner继承抽象类:Partitioner,然后在job对象中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class
3. 实现
public class ProvincePartitioner extends Partitioner<Text, FlowBean>public static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();static{provinceMap.put("134", 0);provinceMap.put("135", 1);provinceMap.put("136", 2);provinceMap.put("137", 3);provinceMap.put("138", 4);}@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {Integer code = provinceMap.get(key.toString().substring(0, 3));if (code != null) {return code;}return 5;}}
public class FlowSumProvince {public static class FlowSumProvinceMapper extends Mapper<LongWritable, Text, Text, FlowBean>{Text k = new Text();FlowBean v = new FlowBean();@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {//拿取一行文本转为StringString line = value.toString();//按照分隔符\t进行分割String[] fileds = line.split("\t");//获取用户手机号String phoneNum = fileds[1];long upFlow = Long.parseLong(fileds[fileds.length-3]);long downFlow = Long.parseLong(fileds[fileds.length-2]);k.set(phoneNum);v.set(upFlow, downFlow);context.write(k,v);}}public static class FlowSumProvinceReducer extends Reducer<Text, FlowBean, Text, FlowBean>{FlowBean v = new FlowBean();@Overrideprotected void reduce(Text key, Iterable<FlowBean> flowBeans,Context context) throws IOException, InterruptedException {long upFlowCount = 0;long downFlowCount = 0;for (FlowBean flowBean : flowBeans) {upFlowCount += flowBean.getUpFlow();downFlowCount += flowBean.getDownFlow();}v.set(upFlowCount, downFlowCount);context.write(key, v);}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();conf.set("mapreduce.framework.name", "local");Job job = Job.getInstance(conf);//指定我这个 job 所在的 jar包位置job.setJarByClass(FlowSumProvince.class);//指定我们使用的Mapper是那个类 reducer是哪个类job.setMapperClass(FlowSumProvinceMapper.class);job.setReducerClass(FlowSumProvinceReducer.class);// job.setCombinerClass(FlowSumProvinceReducer.class);// 设置我们的业务逻辑 Mapper 类的输出 key 和 value 的数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 设置我们的业务逻辑 Reducer 类的输出 key 和 value 的数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//这里设置运行reduceTask的个数//getPartition 返回的分区个数 = NumReduceTasks 正常执行//getPartition 返回的分区个数 > NumReduceTasks 报错:Illegal partition//getPartition 返回的分区个数 < NumReduceTasks 可以执行 ,多出空白文件job.setNumReduceTasks(10);//这里指定使用我们自定义的分区组件job.setPartitionerClass(ProvincePartitioner.class);FileInputFormat.setInputPaths(job, new Path("D:\\flowsum\\input"));// 指定处理完成之后的结果所保存的位置FileOutputFormat.setOutputPath(job, new Path("D:\\flowsum\\outputProvince"));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}}}
转载于:https://blog.51cto.com/13587708/2174193
Mapreduce的分区—Partitioner相关推荐
- 【MapReduce】分区(分区实战案例)、Combiner、Shuffer
分区(分区实战案例).Combiner.Shuffer 1 分区 2 根据部门号建立分区 3 Combiner 4 Shuffer 手动反爬虫,禁止转载: 原博地址 https://blog.csdn ...
- MapReduce中的partitioner
1.日志源文件: 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 ...
- MapReduce 进阶:Partitioner 组件
概述 Partitioner 组件可以让 Map 对 Key 进行分区,从而将不同分区的 Key 交由不同的 Reduce 处理.如果这么说让你觉得有一些笼统的话,那么本文可能很适合你,因为本文会依据 ...
- 并行中的分区Partitioner
本篇介绍在C#中,把一个大集合,或大数组分成若干个区来执行.Demo中是把一组字符串放在list中,然后并行生成MD5串,返回回来. using System; using System.Collec ...
- Spark自定义分区(Partitioner)
我们都知道Spark内部提供了HashPartitioner和RangePartitioner两种分区策略(这两种分区的代码解析可以参见:<Spark分区器HashPartitioner和Ran ...
- MapReduce(深入)---案例之用户上行流量 下行流量 总流量倒序 按省份分区
1. MapReduce的输入和输出 MapReduce执行流程图 详细图解如下 maptask通过自带的TextInputFormat将数据按照一行一行的读取 , 用每一行的起始偏移量作为k , 每 ...
- MapReduce自定义Partitioner
Shuffle过程是会按照Map中输出的key,把数据默认分到一个分区中,那么默认的是如何实现的? HashPartitioner是Partitioner默认的分区规则,其中numReduceTask ...
- Hadoop入门(八)Mapreduce高级shuffle之Partitioner
一.Partitioner概述 Map阶段总共五个步骤,2就是一个分区操作 哪个key到哪个Reducer的分配过程,是由Partitioner规定的. 二.Hadoop内置Partitioner M ...
- 2.27 MapReduce Shuffle过程如何在Job中进行设置
一.shuffle过程 总的来说: *分区 partitioner *排序 sort *copy (用户无法干涉) 拷贝 *分组 group 可设置 *压缩 compress *combiner ma ...
最新文章
- linux下Redis以及phpredis扩展安装
- 36进12第二场淘汰赛:老牟如何晋级?(视频)
- iview render的时候可以写控件的基本格式
- Android官方开发文档Training系列课程中文版:支持不同的设备之支持不同的语言
- python读取大文件的坑_如何在Python中读取大文件的特定部分
- 三次技术转型,程序员的北漂奋斗史
- 通过ng-change选择ng-object
- Windows Xp下 无法定位程序输入点WSAPoll于动态链接库ws2_32.dll 的解决办法
- h5实现海报分享功能
- JavaScript详细版
- 通过Nginx搭建flv流媒体服务器
- 游戏开发论坛_《原神》称霸苹果「2020年度iPhone游戏奖」
- 中国象棋棋盘java_中国象棋棋子及棋盘的绘制
- FSA确定性识别算法
- JavaScript高级第04天笔记
- Storm集成HBase、JDBC、Kafka、Hive
- 风险模型为什么是量化投资成功的关键?
- maya(学习笔记)之骨骼中关节的轴向确定方法
- 05年11月6日考试模拟题
- 学Linux到底需要学什么
热门文章
- js代码测试。【一定要在真实工程下来测试是否好用】
- 【Oracle】创建用户
- BFS-迷宫问题-用宽度(广度)优先搜索解决最优路径问题
- 解决chrome浏览器崩溃,再次安装不上问题
- RestTemplate的异常 Not enough variables available to expand
- netty解决方法 io.netty.util.IllegalReferenceCountException: refCnt: 0, increment: 1
- 解决Ajax异步请求中传数组参数,后台无法接收问题
- thinkphp5(tp5)中success跳转页面和弹窗问题解决
- This dependency was not found: * !!vue-style-loader!css-loader?……解决方案
- Python机器学习笔记:异常点检测算法——Isolation Forest