• 问题描述:

    输入文件格式如下:

    name1    2

    name3    4

    name1    6

    name1    1

    name3    3

    name1    0

    要求输出的文件格式如下:

    name1    0,1,2,6

    name3    3,4

    要求是按照第一列分组,name1与name3也是按照顺序排列的,组内升序排序。

    思路:

    常规的输出,无法排序key所对应的多个值的顺序。为了排序组内中的值,需要将key与value放在同一个组。Job中有两个方法setGroupingComparatorClass和setSortComparatorClass,可以利用这两个方法来实现组内排序。但是这些排序都是基于key的,则就要将key和value定义成组合键。

    但是必须要保证第一列相同的全部都放在同一个分区中,则就需要自定义分区,分区的时候只考虑第一列的值。由于partitioner仅仅能保证每一个reducer接受同一个name的所有记录,但是reducer仍然是通过键进行分组的分区,也就说该分区中还是按照键来分成不同的组,还需要分组只参考name值

    先按照name分组,再在name中内部进行排序。

    解决方法:

    运用自定义组合键的策略,将name和1定义为一个组合键。在分区的时候只参考name的值,即继承partitioner。

    由于要按照name分组,则就需要定义分组策略,然后设置setGroupingComparatorClass。

    setGroupingComparatorClass主要定义哪些key可以放置在一组,分组的时候会对组合键进行比较,由于这里只需要考虑组合键中的一个值,则定义实现一个WritableComparator,设置比较策略。

    对于组内的排序,可以利用setSortComparatorClass来实现,

    这个方法主要用于定义key如何进行排序在它们传递给reducer之前,

    这里就可以来进行组内排序。

    具体代码:

    Hadoop版本号:hadoop1.1.2

    自定义组合键

    view sourceprint?
    01.package whut;
    02.import java.io.DataInput;
    03.import java.io.DataOutput;
    04.import java.io.IOException;
    05.import org.apache.hadoop.io.IntWritable;
    06.import org.apache.hadoop.io.Text;
    07.import org.apache.hadoop.io.WritableComparable;
    08.//自定义组合键策略
    09.//java基本类型数据
    10.public class TextInt implements WritableComparable{
    11.//直接利用java的基本数据类型
    12.private String firstKey;
    13.private int secondKey;
    14.//必须要有一个默认的构造函数
    15.public String getFirstKey() {
    16.return firstKey;
    17.}
    18.public void setFirstKey(String firstKey) {
    19.this.firstKey = firstKey;
    20.}
    21.public int getSecondKey() {
    22.return secondKey;
    23.}
    24.public void setSecondKey(int secondKey) {
    25.this.secondKey = secondKey;
    26.}
    27. 
    28.@Override
    29.public void write(DataOutput out) throws IOException {
    30.// TODO Auto-generated method stub
    31.out.writeUTF(firstKey);
    32.out.writeInt(secondKey);
    33.}
    34.@Override
    35.public void readFields(DataInput in) throws IOException {
    36.// TODO Auto-generated method stub
    37.firstKey=in.readUTF();
    38.secondKey=in.readInt();
    39.}
    40.//map的键的比较就是根据这个方法来进行的
    41.@Override
    42.public int compareTo(Object o) {
    43.// TODO Auto-generated method stub
    44.TextInt ti=(TextInt)o;
    45.//利用这个来控制升序或降序
    46.//this本对象写在前面代表是升序
    47.//this本对象写在后面代表是降序
    48.return this.getFirstKey().compareTo(ti.getFirstKey());
    49.}
    50.}

    分组策略

    view sourceprint?
    01.package whut;
    02.import org.apache.hadoop.io.WritableComparable;
    03.import org.apache.hadoop.io.WritableComparator;
    04.//主要就是对于分组进行排序,分组只按照组建键中的一个值进行分组
    05.public class TextComparator extends WritableComparator {
    06.//必须要调用父类的构造器
    07.protected TextComparator() {
    08.super(TextInt.class,true);//注册comparator
    09.}
    10.@Override
    11.public int compare(WritableComparable a, WritableComparable b) {
    12.// TODO Auto-generated method stub
    13.TextInt ti1=(TextInt)a;
    14.TextInt ti2=(TextInt)b;
    15.return ti1.getFirstKey().compareTo(ti2.getFirstKey());
    16.}
    17.}

    组内排序策略

    view sourceprint?
    01.package whut;
    02.import org.apache.hadoop.io.WritableComparable;
    03.import org.apache.hadoop.io.WritableComparator;
    04.//分组内部进行排序,按照第二个字段进行排序
    05.public class TextIntComparator extends WritableComparator {
    06.public TextIntComparator()
    07.{
    08.super(TextInt.class,true);
    09.}
    10.//这里可以进行排序的方式管理
    11.//必须保证是同一个分组的
    12.//a与b进行比较
    13.//如果a在前b在后,则会产生升序
    14.//如果a在后b在前,则会产生降序
    15.@Override
    16.public int compare(WritableComparable a, WritableComparable b) {
    17.// TODO Auto-generated method stub
    18.TextInt ti1=(TextInt)a;
    19.TextInt ti2=(TextInt)b;
    20.//首先要保证是同一个组内,同一个组的标识就是第一个字段相同
    21.if(!ti1.getFirstKey().equals(ti2.getFirstKey()))
    22.return ti1.getFirstKey().compareTo(ti2.getFirstKey());
    23.else
    24.return ti2.getSecondKey()-ti1.getSecondKey();//0,-1,1
    25.}
    26. 
    27.}

    分区策略

    view sourceprint?
    01.package whut;
    02.import org.apache.hadoop.io.IntWritable;
    03.import org.apache.hadoop.mapreduce.Partitioner;
    04.//参数为map的输出类型
    05.public class KeyPartitioner extends Partitioner<TextInt, IntWritable> {
    06.@Override
    07.public int getPartition(TextInt key, IntWritable value, int numPartitions) {
    08.// TODO Auto-generated method stub
    09.return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
    10.}
    11.}

    MapReduce策略

    view sourceprint?
    001.package whut;
    002.import java.io.IOException;
    003.import org.apache.hadoop.conf.Configuration;
    004.import org.apache.hadoop.conf.Configured;
    005.import org.apache.hadoop.fs.Path;
    006.import org.apache.hadoop.io.IntWritable;
    007.import org.apache.hadoop.io.Text;
    008.import org.apache.hadoop.mapreduce.Job;
    009.import org.apache.hadoop.mapreduce.Mapper;
    010.import org.apache.hadoop.mapreduce.Reducer;
    011.import org.apache.hadoop.mapreduce.Mapper.Context;
    012.import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    013.import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
    014.import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    015.import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    016.import org.apache.hadoop.util.Tool;
    017.import org.apache.hadoop.util.ToolRunner;
    018.//需要对数据进行分组以及组内排序的时候
    019.public class SortMain extends Configured implements Tool{
    020.//这里设置输入文格式为KeyValueTextInputFormat
    021.//name1 5
    022.//默认输入格式都是Text,Text
    023.public static class GroupMapper extends
    024.Mapper<Text, Text, TextInt, IntWritable>  {
    025.public IntWritable second=new IntWritable();
    026.public TextInt tx=new TextInt();
    027.@Override
    028.protected void map(Text key, Text value, Context context)
    029.throws IOException, InterruptedException {
    030.String lineKey=key.toString();
    031.String lineValue=value.toString();
    032.int lineInt=Integer.parseInt(lineValue);
    033.tx.setFirstKey(lineKey);
    034.tx.setSecondKey(lineInt);
    035.second.set(lineInt);
    036.context.write(tx, second);
    037.}
    038.}
    039.//设置reduce
    040.public static class GroupReduce extends Reducer<TextInt, IntWritable, Text, Text>
    041.{
    042.@Override
    043.protected void reduce(TextInt key, Iterable<IntWritable> values,
    044.Context context)
    045.throws IOException, InterruptedException {
    046.StringBuffer sb=new StringBuffer();
    047.for(IntWritable val:values)
    048.{
    049.sb.append(val+",");
    050.}
    051.if(sb.length()>0)
    052.{
    053.sb.deleteCharAt(sb.length()-1);
    054.}
    055.context.write(new Text(key.getFirstKey()), new Text(sb.toString()));
    056.}
    057.}
    058. 
    059.@Override
    060.public int run(String[] args) throws Exception {
    061.// TODO Auto-generated method stub
    062.Configuration conf=getConf();
    063.Job job=new Job(conf,"SecondarySort");
    064.job.setJarByClass(SortMain.class);
    065.// 设置输入文件的路径,已经上传在HDFS
    066.FileInputFormat.addInputPath(job, new Path(args[0]));
    067.// 设置输出文件的路径,输出文件也存在HDFS中,但是输出目录不能已经存在
    068.FileOutputFormat.setOutputPath(job, new Path(args[1]));
    069. 
    070.job.setMapperClass(GroupMapper.class);
    071.job.setReducerClass(GroupReduce.class);
    072.//设置分区方法
    073.job.setPartitionerClass(KeyPartitioner.class);
    074. 
    075.//下面这两个都是针对map端的
    076.//设置分组的策略,哪些key可以放置到一组中
    077.job.setGroupingComparatorClass(TextComparator.class);
    078.//设置key如何进行排序在传递给reducer之前.
    079.//这里就可以设置对组内如何排序的方法
    080./*************关键点**********/
    081.job.setSortComparatorClass(TextIntComparator.class);
    082.//设置输入文件格式
    083.job.setInputFormatClass(KeyValueTextInputFormat.class);
    084.//使用默认的输出格式即TextInputFormat
    085.//设置map的输出key和value类型
    086.job.setMapOutputKeyClass(TextInt.class);
    087.job.setMapOutputValueClass(IntWritable.class);
    088.//设置reduce的输出key和value类型
    089.//job.setOutputFormatClass(TextOutputFormat.class);
    090.job.setOutputKeyClass(Text.class);
    091.job.setOutputValueClass(Text.class);
    092.job.waitForCompletion(true);
    093.int exitCode=job.isSuccessful()?0:1;
    094.return exitCode;
    095.}
    096. 
    097.public static void main(String[] args)  throws Exception
    098.{
    099.int exitCode=ToolRunner.run(new SortMain(), args);
    100.System.exit(exitCode);
    101.}
    102.}

    注意事项

    1,设置分组排序按照升序还是降序是在自定义WritableComparable中的compareTo()方法实现的,具体升序或者降序的设置在代码中已经注释说明

    2,设置组内值进行升序还是降序的排序是在组内排序策略中的compare()方法注释说明的。

    3,这里同时最重要的一点是,将第二列即放在组合键中,又作为value,这样对于组合键排序也就相当于对于value进行排序了。

    4,在自定义组合键的时候,对于组合键中的数据的基本类型可以采用Java的基本类型也可以采用Hadoop的基本数据类型,对于Hadoop的基本数据类型一定要记得初始化new一个基本数据类型对象。对于组合键类,必须要有默认的构造方法。

MapReduce的自制Writable分组输出及组内排序相关推荐

  1. Json数组列表中的数据分组排序、组内排序

    文章目录 问题描述 方式一:先全部排序,在分组排序 方式二:使用HashMap取出来分组再组内排序 方式三:使用TreeMap取出来分组再组内排序 测试代码及耗时 问题描述 现在有一个用户信息数组,用 ...

  2. linq分组再实现组内排序

    哎呀  转前端了   才一年没写后端 linq查询都忘记了 原来炒鸡简单  记录下: 1. 分组查询 ​// 简单分组查询 var query = (from p in queryorderby p. ...

  3. SQL实现分组排序和组内排序(相同分数并列排名)

    创建数据表 CREATE TABLE `heyf_t10` (`empid` int(11) DEFAULT NULL,`deptid` int(11) DEFAULT NULL,`salary` d ...

  4. mysql 组内分组_[MySQL] group by 分组并进行组内排序取得最新一条

    有一个需求是获取指定用户发送的最新的内容,这个时候需要使用group by分组功能 但是怎么获取最新的呢 ? 如果直接进行order by 是不能实现的,因为MysqL会先执行group by 后执行 ...

  5. mysql的组内排序生成序号_sql 分组查询,组内排序, 组内添加序号 (SQL Server 排序函数 ROW_NUMBER和RANK 用法总结)...

    下面的例子和SQL语句均在SQL Server 2008环境下运行通过,使用SQL Server自带的AdventureWorks数据库. -- 添加序列号 -- 行号用法: ROW_NUMBER() ...

  6. mysql分组后组内排名_SQL实现group by 分组后组内排序

    在一个月黑风高的夜晚,自己无聊学习的SQL的时候,练习,突发奇想的想实现一个功能查询,一张成绩表有如下字段,班级ID,英语成绩,数据成绩,语文成绩如下图 实现 查询出 每个班级英语成绩最高的前两名的记 ...

  7. 《数据库SQL实战》从titles表获取按照title进行分组,每组个数大于等于2,给出title以及对应的数目t。 注意对于重复的emp_no进行忽略。

    题目描述 从titles表获取按照title进行分组,每组个数大于等于2,给出title以及对应的数目t. 注意对于重复的emp_no进行忽略. CREATE TABLE IF NOT EXISTS ...

  8. 从titles表获取按照title进行分组,每组个数大于等于2,给出title以及对应的数目t。

    题目描述 从titles表获取按照title进行分组,每组个数大于等于2,给出title以及对应的数目t. CREATE TABLE IF NOT EXISTS "titles" ...

  9. mysql分组取出每组地一条数据_基于mysql实现group by取各分组最新一条数据

    基于mysql实现group by取各分组最新一条数据 前言: group by函数后取到的是分组中的第一条数据,但是我们有时候需要取出各分组的最新一条,该怎么实现呢? 本文提供两种实现方式. 一.准 ...

最新文章

  1. pam_frpintd.so 错误修复
  2. sql_trace的介绍
  3. 【精品】【分享】盖茨留给职场工作者的十句警告
  4. 深度学习和目标检测系列教程 13-300:YOLO 物体检测算法
  5. [转] 爱情的隐式马尔可夫模型(Love in the Hidden Markov Model)
  6. 类从未使用_如果您从未依赖在线销售,如何优化您的网站
  7. 双指针--Codeforces Round #645 (Div. 2) d题
  8. defconfig、 .config
  9. 内存管理, 对象的生命周期
  10. [警告] multi-字符 character constant [-Wmultichar] ----字符+符号输出错误
  11. 记录——《C Primer Plus (第五版)》第八章编程练习第八题
  12. SecureCrt 常用命令
  13. CHD+CM-1 安装
  14. 教你怎么筛选排除百度搜索引擎的屏蔽违规词
  15. 苹果历代产品中的8大亮点设计(上)
  16. 保健用品智慧供应链管理系统:精细化管理供应商与采购环节,打造敏捷型供应链
  17. 【头部姿态】头部姿态检测(一)
  18. 布拉格捷克理工大学研究团队:Prisma进化版
  19. 网络安全卷么? 你身边的网络安全人过的怎么样呢?
  20. 【WRF如何在输出的wrfoutput文件中设置添加/删除变量】

热门文章

  1. 小论Java类变量的隐私泄露
  2. 风险案例-25期-与有过合作经历客户在新合同约定中过于简单、范围不明确,导致客户对新需求工作量不认可...
  3. Linux tmux分屏工具
  4. linux mint 13 input method of chinese
  5. 常用的 服务器 与 交换机
  6. iphone-common-codes-ccteam源代码 CCUIAlertView.m
  7. SQL查询案例:行列转换[行转列, 使用 CASE WHEN 处理]
  8. 在代码中向ReportViewer动态添加数据源
  9. 2019百度之星初赛-1
  10. IIS6.0打开ASP文件,出现500错误或404错误解决方法