自定义分组
NameGroup

package test;import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;public class NameGroup  implements RawComparator<ConsumeWritable>{public int compare(ConsumeWritable o1, ConsumeWritable o2) {return o1.getName().compareTo(o2.getName());}/*** 封装key1:zhangsan,135.00  b1=12个字节   key2:yuti,11032 b2=8个字节   * 将组合key转为二进制数组* 比较两个对象在二进制层面* b1 第一个CosumeWritable对象转成的字节数据* s1代表从b1的第几个字节比较* l1代表b1的长度* compareBytes(b1,s1,l1-4(比较字节个数))*   */public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4);}}

ConsumeWritable

package test;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class ConsumeWritable implements WritableComparable<ConsumeWritable>{private  String name;private float money;public ConsumeWritable() {}public ConsumeWritable(String name, float money) {super();this.name = name;this.money = money;}//从源码中的获得public void  set(String  name,float money){this.name=name;this.money=money;}public String getName() {return name;}public void setName(String name) {this.name = name;}public float getMoney() {return money;}public void setMoney(float money) {this.money = money;}//序列化public void write(DataOutput out) throws IOException {out.writeUTF(name);out.writeFloat(money);}//反序列化public void readFields(DataInput in) throws IOException {name=in.readUTF();money=in.readFloat();}public int compareTo(ConsumeWritable o) {//第一次比较int compareTo = this.getName().compareTo(o.getName());if (compareTo !=0) {return compareTo;}//第二次比较  注意:普通的数据类型是没有compaerTo方法 所以要转换为他的包装类return Float.valueOf(this.getMoney()).compareTo(Float.valueOf(o.getMoney()));}//比较对象两个对象,需要重写equals和hashcode()方法@Overridepublic int hashCode() {final int prime = 31;int result = 1;result = prime * result + Float.floatToIntBits(money);result = prime * result + ((name == null) ? 0 : name.hashCode());return result;}@Overridepublic boolean equals(Object obj) {if (this == obj)return true;if (obj == null)return false;if (getClass() != obj.getClass())return false;ConsumeWritable other = (ConsumeWritable) obj;if (Float.floatToIntBits(money) != Float.floatToIntBits(other.money))return false;if (name == null) {if (other.name != null)return false;} else if (!name.equals(other.name))return false;return true;}@Overridepublic String toString() {return name + "," + money;}}

主要程序:

import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/*** 主要思想:根据shuffle阶段排序是根据key来排序的* @author Administrator**/
public class SecondSortMapReduce extends Configured implements Tool{//map映射public static class  SecondSortMapper extends Mapper<LongWritable, Text, ConsumeWritable, FloatWritable>{private ConsumeWritable mapOutPutKey = new ConsumeWritable();private FloatWritable mapOutPutValue= new FloatWritable();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//把读取出来的内容 装换为String  类型String  line = value.toString();//通过制表符分割String[] split = line.split("\t");mapOutPutKey.set(split[0], Float.valueOf(split[1]));mapOutPutValue.set(Float.parseFloat(split[1]));System.err.print("key: "+mapOutPutKey.toString());System.err.print("->value: "+mapOutPutValue+"\n");context.write(mapOutPutKey, mapOutPutValue);}}//分区   参数是map输出的public static class MyPartitoner  extends Partitioner<ConsumeWritable, FloatWritable>{@Overridepublic int getPartition(ConsumeWritable key, FloatWritable value,int numPartitions) {//根据hashpatitioner源码的得到return (key.getName().hashCode() & Integer.MAX_VALUE) % numPartitions;}}public static class SecondSortReducer extends Reducer<ConsumeWritable, FloatWritable, Text, FloatWritable>{private  Text  OutPutKey =new Text();private  FloatWritable OutPutValue = new FloatWritable();@Overrideprotected void reduce(ConsumeWritable key,Iterable<FloatWritable> values,Context context )throws IOException, InterruptedException {System.out.print("key:"+key.toString()+"["+"value:");OutPutKey.set(key.getName());for (FloatWritable floatWritable : values) {System.out.print(floatWritable+",");OutPutValue.set(floatWritable.get());context.write(OutPutKey, OutPutValue);}System.out.println("]"+"\n");}}public int run(String[] args) throws Exception {// 1.创建Configuration对象,获取配置文件Configuration conf = new Configuration();// 2.构建MapReduce Job对象Job job = Job.getInstance(conf, this.getClass().getSimpleName());job.setJarByClass(getClass());// 3.输入目录/文件(input) -》 map -》 reduce -》输出路径 (output)// 3.1 设置输入文件所在目录Path inPath = new Path(args[0]);FileInputFormat.setInputPaths(job, inPath);// 3.2 设置Map输出信息job.setMapperClass(SecondSortMapper.class);job.setMapOutputKeyClass(ConsumeWritable.class);job.setMapOutputValueClass(FloatWritable.class);//自定义分区job.setPartitionerClass(MyPartitoner.class);//自定义分组job.setGroupingComparatorClass(NameGroup.class);// 3.3设置reduce的输出信息job.setReducerClass(SecondSortReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FloatWritable.class);// 3.4 设置输出路径Path outPath = new Path(args[1]);FileSystem fs = outPath.getFileSystem(conf);if (fs.exists(outPath)) {fs.delete(outPath, true);}FileOutputFormat.setOutputPath(job, outPath);// 提交job/*** 可以详细显示任务的进度信息 job.submit()这种方式是做不到的*/boolean isSuccessed = job.waitForCompletion(true);// job.submit(); 不推荐return isSuccessed ? 0 : 1;}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();args = new String[] { "hdfs://hive01:8020/input/ceshi.txt", "hdfs://hive01:8020/outputtest1"};int status = ToolRunner.run(conf, new SecondSortMapReduce(), args);System.exit(status);}}

MapReduce二次排序分区,分组优化相关推荐

  1. 详细讲解MapReduce二次排序过程

    2019独角兽企业重金招聘Python工程师标准>>> 我在15年处理大数据的时候还都是使用MapReduce, 随着时间的推移, 计算工具的发展, 内存越来越便宜, 计算方式也有了 ...

  2. Hadoop学习笔记—11.MapReduce中的排序和分组

    Hadoop学习笔记-11.MapReduce中的排序和分组 一.写在之前的 1.1 回顾Map阶段四大步骤 首先,我们回顾一下在MapReduce中,排序和分组在哪里被执行: 从上图中可以清楚地看出 ...

  3. MapReduce二次排序

    2019独角兽企业重金招聘Python工程师标准>>> 默认情况下,Map输出的结果会对Key进行默认的排序,但是有时候需要对Key排序的同时还需要对Value进行排序,这时候就要用 ...

  4. hadoop之MapReduce自定义二次排序流程实例详解

    一.概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的.在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求 ...

  5. Hadoop Mapreduce分区、分组、二次排序过程详解

    2019独角兽企业重金招聘Python工程师标准>>> 1.MapReduce中数据流动    (1)最简单的过程:  map - reduce    (2)定制了partition ...

  6. Hadoop Mapreduce分区、分组、二次排序过程详解[转]

    徐海蛟 教学用途 1.MapReduce中数据流动 (1)最简单的过程: map - reduce (2)定制了partitioner以将map的结果送往指定reducer的过程: map - par ...

  7. 大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)

       前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分 ...

  8. MapReduce自定义二次排序流程

    每一条记录开始是进入到map函数进行处理,处理完了之后立马就入自定义分区函数中对其进行分区,当所有输入数据经过map函数和分区函数处理完之后,就调用自定义二次排序函数对其进行排序. MapReduce ...

  9. Mapreduce的排序、全排序以及二次排序

    一:背景 Hadoop中虽然有自动排序和分组,由于自带的排序是按照Key进行排序的,有些时候,我们希望同时对Key和Value进行排序.自带的排序功能就无法满足我们了,还好Hadoop提供了一些组件可 ...

最新文章

  1. 网闸与防火墙的区别是什么
  2. 关于node.js的web框架的应用及并发性能测试
  3. 【Android工具】高端DLNA音乐播放器Hi-Fi Cast 来自play
  4. 最新17个紫色风格网页设计作品欣赏
  5. SAP Commerce Cloud Spartacus UI footer 区域的设计模型
  6. 用十行代码快速创建权限管理系统
  7. 2018年4月13日_Java的最新发展– 2018年4月下旬
  8. 实例48:python
  9. 2021-07-23 小记
  10. Beginning iCloud in iOS 5 Tutorial Part 2(转载)
  11. linux复制以a开头的文件,linux部分试题
  12. java 国际象棋 中文版_卡尔波夫国际象棋豪华版
  13. 洛谷 P1028 数的计算【递推】
  14. php 表单验证代码,php 表单验证实现代码
  15. 大数据:阿里数据仓库建模及管理体系OneData什么是阿里onedata
  16. 第一集 斗罗世界 第六章
  17. 电脑连接电视的方法---HDMI篇
  18. 搭建企业私有Git服务
  19. tf-faster-rcnn训练报错: Loaded runtime CuDNN library: 7.0.5 but source was compiled with: 7.1.4.r-rcn
  20. CentOS 7 安装Ibus中文输入法

热门文章

  1. scrapy--(failed 3 times): [<twisted.python.failure.Failure twisted.web._newclient.ParseError
  2. 2005年世界500强公司名单
  3. 套料排版代码python_2D板材排版套料开源代码
  4. 怎样留住你,我的攻城狮——如何避免优质技术人才流失
  5. MongoDB自学笔记下载
  6. 跳过密码打开excel表格xlsx,忘记excel表格xlsx密码如何找回?
  7. java计算机毕业设计科大学生党员之家设计源程序+mysql+系统+lw文档+远程调试
  8. 20个热门少儿编程网站与应用【家长必读】
  9. geospatial地理位置
  10. Android 7.1 增加屏幕边缘滑动事件(手势滑动)两种方式(Back,Home,Menu功能键) 免开启无障碍功能