1. 需求
将流量汇总统计结果按照手机归属地不同省份输出到不同文件中。
2. 分析
Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask。
默认的分发规则为:根据key的hashcode%reducetask数来分发
所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner,自定义一个CustomPartitioner继承抽象类:Partitioner,然后在job对象中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class
3. 实现

public class ProvincePartitioner extends Partitioner<Text, FlowBean>public static HashMap<String, Integer>  provinceMap = new HashMap<String, Integer>();static{provinceMap.put("134", 0);provinceMap.put("135", 1);provinceMap.put("136", 2);provinceMap.put("137", 3);provinceMap.put("138", 4);}@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {Integer code = provinceMap.get(key.toString().substring(0, 3));if (code != null) {return code;}return 5;}}
public class FlowSumProvince {public static class FlowSumProvinceMapper extends Mapper<LongWritable, Text, Text, FlowBean>{Text k = new Text();FlowBean  v = new FlowBean();@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {//拿取一行文本转为StringString line = value.toString();//按照分隔符\t进行分割String[] fileds = line.split("\t");//获取用户手机号String phoneNum = fileds[1];long upFlow = Long.parseLong(fileds[fileds.length-3]);long downFlow = Long.parseLong(fileds[fileds.length-2]);k.set(phoneNum);v.set(upFlow, downFlow);context.write(k,v);}}public static class FlowSumProvinceReducer extends Reducer<Text, FlowBean, Text, FlowBean>{FlowBean  v  = new FlowBean();@Overrideprotected void reduce(Text key, Iterable<FlowBean> flowBeans,Context context) throws IOException, InterruptedException {long upFlowCount = 0;long downFlowCount = 0;for (FlowBean flowBean : flowBeans) {upFlowCount += flowBean.getUpFlow();downFlowCount += flowBean.getDownFlow();}v.set(upFlowCount, downFlowCount);context.write(key, v);}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();conf.set("mapreduce.framework.name", "local");Job job = Job.getInstance(conf);//指定我这个 job 所在的 jar包位置job.setJarByClass(FlowSumProvince.class);//指定我们使用的Mapper是那个类  reducer是哪个类job.setMapperClass(FlowSumProvinceMapper.class);job.setReducerClass(FlowSumProvinceReducer.class);//        job.setCombinerClass(FlowSumProvinceReducer.class);// 设置我们的业务逻辑 Mapper 类的输出 key 和 value 的数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 设置我们的业务逻辑 Reducer 类的输出 key 和 value 的数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//这里设置运行reduceTask的个数//getPartition 返回的分区个数 = NumReduceTasks   正常执行//getPartition 返回的分区个数 > NumReduceTasks   报错:Illegal partition//getPartition 返回的分区个数 < NumReduceTasks   可以执行 ,多出空白文件job.setNumReduceTasks(10);//这里指定使用我们自定义的分区组件job.setPartitionerClass(ProvincePartitioner.class);FileInputFormat.setInputPaths(job, new Path("D:\\flowsum\\input"));// 指定处理完成之后的结果所保存的位置FileOutputFormat.setOutputPath(job, new Path("D:\\flowsum\\outputProvince"));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}}}

转载于:https://blog.51cto.com/13587708/2174193

Mapreduce的分区—Partitioner相关推荐

  1. 【MapReduce】分区(分区实战案例)、Combiner、Shuffer

    分区(分区实战案例).Combiner.Shuffer 1 分区 2 根据部门号建立分区 3 Combiner 4 Shuffer 手动反爬虫,禁止转载: 原博地址 https://blog.csdn ...

  2. MapReduce中的partitioner

    1.日志源文件: 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 ...

  3. MapReduce 进阶:Partitioner 组件

    概述 Partitioner 组件可以让 Map 对 Key 进行分区,从而将不同分区的 Key 交由不同的 Reduce 处理.如果这么说让你觉得有一些笼统的话,那么本文可能很适合你,因为本文会依据 ...

  4. 并行中的分区Partitioner

    本篇介绍在C#中,把一个大集合,或大数组分成若干个区来执行.Demo中是把一组字符串放在list中,然后并行生成MD5串,返回回来. using System; using System.Collec ...

  5. Spark自定义分区(Partitioner)

    我们都知道Spark内部提供了HashPartitioner和RangePartitioner两种分区策略(这两种分区的代码解析可以参见:<Spark分区器HashPartitioner和Ran ...

  6. MapReduce(深入)---案例之用户上行流量 下行流量 总流量倒序 按省份分区

    1. MapReduce的输入和输出 MapReduce执行流程图 详细图解如下 maptask通过自带的TextInputFormat将数据按照一行一行的读取 , 用每一行的起始偏移量作为k , 每 ...

  7. MapReduce自定义Partitioner

    Shuffle过程是会按照Map中输出的key,把数据默认分到一个分区中,那么默认的是如何实现的? HashPartitioner是Partitioner默认的分区规则,其中numReduceTask ...

  8. Hadoop入门(八)Mapreduce高级shuffle之Partitioner

    一.Partitioner概述 Map阶段总共五个步骤,2就是一个分区操作 哪个key到哪个Reducer的分配过程,是由Partitioner规定的. 二.Hadoop内置Partitioner M ...

  9. 2.27 MapReduce Shuffle过程如何在Job中进行设置

    一.shuffle过程 总的来说: *分区 partitioner *排序 sort *copy (用户无法干涉) 拷贝 *分组 group 可设置 *压缩 compress *combiner ma ...

最新文章

  1. linux下Redis以及phpredis扩展安装
  2. 36进12第二场淘汰赛:老牟如何晋级?(视频)
  3. iview render的时候可以写控件的基本格式
  4. Android官方开发文档Training系列课程中文版:支持不同的设备之支持不同的语言
  5. python读取大文件的坑_如何在Python中读取大文件的特定部分
  6. 三次技术转型,程序员的北漂奋斗史
  7. 通过ng-change选择ng-object
  8. Windows Xp下 无法定位程序输入点WSAPoll于动态链接库ws2_32.dll 的解决办法
  9. h5实现海报分享功能
  10. JavaScript详细版
  11. 通过Nginx搭建flv流媒体服务器
  12. 游戏开发论坛_《原神》称霸苹果「2020年度iPhone游戏奖」
  13. 中国象棋棋盘java_中国象棋棋子及棋盘的绘制
  14. FSA确定性识别算法
  15. JavaScript高级第04天笔记
  16. Storm集成HBase、JDBC、Kafka、Hive
  17. 风险模型为什么是量化投资成功的关键?
  18. maya(学习笔记)之骨骼中关节的轴向确定方法
  19. 05年11月6日考试模拟题
  20. 学Linux到底需要学什么

热门文章

  1. js代码测试。【一定要在真实工程下来测试是否好用】
  2. 【Oracle】创建用户
  3. BFS-迷宫问题-用宽度(广度)优先搜索解决最优路径问题
  4. 解决chrome浏览器崩溃,再次安装不上问题
  5. RestTemplate的异常 Not enough variables available to expand
  6. netty解决方法 io.netty.util.IllegalReferenceCountException: refCnt: 0, increment: 1
  7. 解决Ajax异步请求中传数组参数,后台无法接收问题
  8. thinkphp5(tp5)中success跳转页面和弹窗问题解决
  9. This dependency was not found: * !!vue-style-loader!css-loader?……解决方案
  10. Python机器学习笔记:异常点检测算法——Isolation Forest