二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果。

这里主要讲如何使用一个Mapreduce就可以实现二次排序。Hadoop有自带的SecondarySort程序,但这个程序只能对整数进行排序,所以我们需要对其进行改进,使其可以对任意字符串进行排序。下面会分别列出这两个程序的详解。

Hadoop自带的例子中定义的map和reduce如下,关键是它对输入输出类型的定义:(java泛型编程)

public static  class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>  
        public static class Reduce extends Reducer<IntPair, NullWritable,  IntWritable, IntWritable>

在 map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时 InputFormat提供一个RecordReder的实现。本例子中使用的是TextInputFormat,他提供的RecordReder会将文 本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是<LongWritable,   Text>的原因。然后调用自定义Map的map方法,将一个个<LongWritable,   Text>对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出<IntPair,   IntWritable>。最终是生成一个List<IntPair,   IntWritable>。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到 一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次 排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。在第一个 例子中,使用了IntPair实现的compareTo方法,而在下一个例子中,专门定义了key比较函数类。  
    在reduce阶 段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比 较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用 jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们 的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方 法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。

2 Hadoop自带的只对两个整型自带排序例子详解

2.1 测试数据如下所示:

20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
70 55
70 56
70 57
70 58
1 2
3 4
5 6
7 82
203 21
50 512
50 522
50 53
530 54
40 511
20 53
20 522
60 56
60 57
740 58
63 61
730 54
71 55
71 56
73 57
74 58
12 211
31 42
50 62
7 8

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Job.JobState;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import service.plugin.EJob;public class SecondarySort{/*** @ClassName IntPair* @Description 定义IntPair对象,该对象实现WritableComparable接口,描述第一列和第二列数据,同时完成两列数据的相关操作,这里是对二者进行比较* */public static class IntPair implements WritableComparable<IntPair> {int first;int second;/*** Set the left and right values.*/public void set(int left, int right) {first = left;second = right;}public int getFirst() {return first;}public int getSecond() {return second;}@Override// 反序列化,从流中的二进制转换成IntPairpublic void readFields(DataInput in) throws IOException {// TODO Auto-generated method stubfirst = in.readInt();second = in.readInt();}@Override// 序列化,将IntPair转化成使用流传送的二进制public void write(DataOutput out) throws IOException {// TODO Auto-generated method stub
            out.writeInt(first);out.writeInt(second);}@Override// key的比较public int compareTo(IntPair o) {// TODO Auto-generated method stubif (first != o.first) {return first < o.first ? -1 : 1;} else if (second != o.second) {return second < o.second ? -1 : 1;} else {return 0;}}// 新定义类应该重写的两个方法,不用这个方法好像也可以// @Override// The hashCode() method is used by the HashPartitioner (the default// partitioner in MapReduce)// public int hashCode() {// return first * 157 + second;// }
 @Overridepublic boolean equals(Object right) {if (right == null)return false;if (this == right)return true;if (right instanceof IntPair) {IntPair r = (IntPair) right;return r.first == first && r.second == second;} else {return false;}}}/*** 分区函数类。根据first确定Partition。*/public static class FirstPartitioner extends Partitioner<IntPair, IntWritable> {@Overridepublic int getPartition(IntPair key, IntWritable value, int numPartitions) {System.out.println("FirstPartitioner-----------------------------------------------");System.out.println("Math.abs(key.getFirst() * 127) % numPartitions: " + Math.abs(key.getFirst() * 127) % numPartitions);return Math.abs(key.getFirst() * 127) % numPartitions;}}/*** 分组函数类。只要first相同就属于同一个组。*//** //第一种方法,实现接口RawComparator public static class GroupingComparator* implements RawComparator<IntPair> {* * @Override public int compare(IntPair o1, IntPair o2) { int l =* o1.getFirst(); int r = o2.getFirst(); return l == r ? 0 : (l < r ? -1 :* 1); }* * @Override //一个字节一个字节的比,直到找到一个不相同的字节,然后比这个字节的大小作为两个字节流的大小比较结果。 public int* compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){ // TODO* Auto-generated method stub return WritableComparator.compareBytes(b1, s1,* Integer.SIZE/8, b2, s2, Integer.SIZE/8); } }*/// 第二种方法,继承WritableComparatorpublic static class GroupingComparator extends WritableComparator {protected GroupingComparator() {super(IntPair.class, true);System.out.println("GroupingComparator---------------------------------");}@Override// Compare two WritableComparables.public int compare(WritableComparable w1, WritableComparable w2) {IntPair ip1 = (IntPair) w1;IntPair ip2 = (IntPair) w2;int l = ip1.getFirst();int r = ip2.getFirst();return l == r ? 0 : (l < r ? -1 : 1);}}/*** @ClassName Map* @Description 自定义map类,将每行数据进行分拆,第一列的数据存入left变量,第二列数据存入right变量*              在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer*              。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。*/public static class Map extendsMapper<LongWritable, Text, IntPair, IntWritable> {private final IntPair intkey = new IntPair();private final IntWritable intvalue = new IntWritable();public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();// 调用java自己的工具类StringTokenizer(),将map输入的每行字符串按规则进行分割成每个字符串,这些规则有\t\n\r\f,基本上分割的结果都可以保证到最细的字符串粒度StringTokenizer tokenizer = new StringTokenizer(line);int left = 0;int right = 0;if (tokenizer.hasMoreTokens()) {left = Integer.parseInt(tokenizer.nextToken());System.out.println("left: " + left);if (tokenizer.hasMoreTokens())right = Integer.parseInt(tokenizer.nextToken());intkey.set(left, right);intvalue.set(right);context.write(intkey, intvalue);}}}// 自定义reducepublic static class Reduce extendsReducer<IntPair, IntWritable, Text, IntWritable> {private final Text left = new Text();private static final Text SEPARATOR = new Text("------------------------------------------------");public void reduce(IntPair key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {context.write(SEPARATOR, null);System.out.println("------------------------------------------------");left.set(Integer.toString(key.getFirst()));for (IntWritable val : values) {System.out.println("reduce: left " + left + "    , val " + val);context.write(left, val);}}}/*** @param args*/public static void main(String[] args) throws IOException,InterruptedException, ClassNotFoundException {// 读取hadoop配置File jarFile = EJob.createTempJar("bin");ClassLoader classLoader = EJob.getClassLoader();Thread.currentThread().setContextClassLoader(classLoader);Configuration conf = new Configuration(true);String[] otherArgs = new String[2];otherArgs[0] = "hdfs://192.168.1.100:9000/test_in/secondary_sort_data.txt";String time = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());otherArgs[1] = "hdfs://192.168.1.100:9000/test_out/mr-" + time;Job job = new Job(conf, "secondarysort");job.setJarByClass(SecondarySort.class);((JobConf) job.getConfiguration()).setJar(jarFile.toString());job.setMapperClass(Map.class);// 不再需要Combiner类型,因为Combiner的输出类型<Text,// IntWritable>对Reduce的输入类型<IntPair, IntWritable>不适用// job.setCombinerClass(Reduce.class);// 分区函数job.setPartitionerClass(FirstPartitioner.class);// 分组函数job.setGroupingComparatorClass(GroupingComparator.class);// Reducer类型job.setReducerClass(Reduce.class);// map输出Key的类型job.setMapOutputKeyClass(IntPair.class);// map输出Value的类型job.setMapOutputValueClass(IntWritable.class);// reduce输出Key的类型,是Text,因为使用的OutputFormatClass是TextOutputFormatjob.setOutputKeyClass(Text.class);// reduce输出Value的类型job.setOutputValueClass(IntWritable.class);// 将输入的数据集分割成小数据块splites,同时提供一个RecordReder的实现。job.setInputFormatClass(TextInputFormat.class);// 提供一个RecordWriter的实现,负责数据输出。job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));// 提交jobif (job.waitForCompletion(false)) {System.out.println("job ok !");} else {System.out.println("job error !");}}
}

执行结果如下所示:

------------------------------------------------
1    2
------------------------------------------------
3    4
------------------------------------------------
5    6
------------------------------------------------
7    8
7    82
------------------------------------------------
12    211
------------------------------------------------
20    21
20    53
20    522
------------------------------------------------
31    42
------------------------------------------------
40    511
------------------------------------------------
50    51
50    52
50    53
50    53
50    54
50    62
50    512
50    522
------------------------------------------------
60    51
60    52
60    53
60    56
60    56
60    57
60    57
60    61
------------------------------------------------
63    61
------------------------------------------------
70    54
70    55
70    56
70    57
70    58
70    58
------------------------------------------------
71    55
71    56
------------------------------------------------
73    57
------------------------------------------------
74    58
------------------------------------------------
203    21
------------------------------------------------
530    54
------------------------------------------------
730    54
------------------------------------------------
740    58

3 改进后的二次排序(可对字符串进行排序)

3.1 测试数据如下所示:

import java
import java
import java
import java
            
import1 org
import org1
import1 org
import2 org2
import org
import2 org1
import1 org
import1 org
import org2
import2 org3
            org
import org
import1 org
importin org
import org
hello time

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import service.plugin.EJob;public class SecondarySortString {// 自己定义的key类应该实现WritableComparable接口public static class IntPair implements WritableComparable<IntPair> {String first;String second;/*** Set the left and right values.*/public void set(String left, String right) {first = left;second = right;}public String getFirst() {return first;}public String getSecond() {return second;}// 反序列化,从流中的二进制转换成IntPairpublic void readFields(DataInput in) throws IOException {first = in.readUTF();second = in.readUTF();}// 序列化,将IntPair转化成使用流传送的二进制public void write(DataOutput out) throws IOException {out.writeUTF(first);out.writeUTF(second);}// 重载 compareTo 方法,进行组合键 key 的比较,该过程是默认行为。// 分组后的二次排序会隐式调用该方法。public int compareTo(IntPair o) {if (!first.equals(o.first)) {return first.compareTo(o.first);} else if (!second.equals(o.second)) {return second.compareTo(o.second);} else {return 0;}}// 新定义类应该重写的两个方法// The hashCode() method is used by the HashPartitioner (the default// partitioner in MapReduce)public int hashCode() {return first.hashCode() * 157 + second.hashCode();}public boolean equals(Object right) {if (right == null)return false;if (this == right)return true;if (right instanceof IntPair) {IntPair r = (IntPair) right;return r.first.equals(first) && r.second.equals(second);} else {return false;}}}/*** 分区函数类。根据first确定Partition。*/public static class FirstPartitioner extends Partitioner<IntPair, Text> {public int getPartition(IntPair key, Text value, int numPartitions) {return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;}}/*** 分组函数类。只要first相同就属于同一个组。*//** //第一种方法,实现接口RawComparator public static class GroupingComparator* implements RawComparator<IntPair> { public int compare(IntPair o1,* IntPair o2) { int l = o1.getFirst(); int r = o2.getFirst(); return l == r* ? 0 : (l < r ? -1 : 1); }* //一个字节一个字节的比,直到找到一个不相同的字节,然后比这个字节的大小作为两个字节流的大小比较结果。 public int* compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){ return* WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2,* Integer.SIZE/8); } }*/// 第二种方法,继承WritableComparatorpublic static class GroupingComparator extends WritableComparator {protected GroupingComparator() {super(IntPair.class, true);}// Compare two WritableComparables.// 重载 compare:对组合键按第一个自然键排序分组public int compare(WritableComparable w1, WritableComparable w2) {IntPair ip1 = (IntPair) w1;IntPair ip2 = (IntPair) w2;String l = ip1.getFirst();String r = ip2.getFirst();return l.compareTo(r);}}// 自定义mappublic static class Map extends Mapper<LongWritable, Text, IntPair, Text> {private final IntPair keyPair = new IntPair();String[] lineArr = null;public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();if(line.isEmpty()){return;}lineArr = line.split(" ", -1);keyPair.set(lineArr[0], lineArr[1]);context.write(keyPair, value);}}// 自定义reducepublic static class Reduce extends Reducer<IntPair, Text, Text, Text> {private static final Text SEPARATOR = new Text("------------------------------------------------");public void reduce(IntPair key, Iterable<Text> values, Context context)throws IOException, InterruptedException {context.write(SEPARATOR, null);for (Text val : values) {context.write(null, val);}}}public static void main(String[] args) throws IOException,InterruptedException, ClassNotFoundException {File jarFile = EJob.createTempJar("bin");ClassLoader classLoader = EJob.getClassLoader();Thread.currentThread().setContextClassLoader(classLoader);Configuration conf = new Configuration(true);String[] otherArgs = new String[2];otherArgs[0] = "hdfs://192.168.1.100:9000/data/test_in/secondary_sort_data_string.txt";String time = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());otherArgs[1] = "hdfs://192.168.1.100:9000/data/test_out/mr-" + time;// 实例化一道作业Job job = new Job(conf, "secondarysort");job.setJarByClass(SecondarySort.class);((JobConf) job.getConfiguration()).setJar(jarFile.toString());// Mapper类型job.setMapperClass(Map.class);// 不再需要Combiner类型,因为Combiner的输出类型<Text,// IntWritable>对Reduce的输入类型<IntPair, IntWritable>不适用// job.setCombinerClass(Reduce.class);// Reducer类型job.setReducerClass(Reduce.class);// 分区函数job.setPartitionerClass(FirstPartitioner.class);// 分组函数job.setGroupingComparatorClass(GroupingComparator.class);// map 输出Key的类型job.setMapOutputKeyClass(IntPair.class);// map输出Value的类型job.setMapOutputValueClass(Text.class);// rduce输出Key的类型,是Text,因为使用的OutputFormatClass是TextOutputFormatjob.setOutputKeyClass(Text.class);// rduce输出Value的类型job.setOutputValueClass(Text.class);// 将输入的数据集分割成小数据块splites,同时提供一个RecordReder的实现。job.setInputFormatClass(TextInputFormat.class);// 提供一个RecordWriter的实现,负责数据输出。job.setOutputFormatClass(TextOutputFormat.class);// 输入hdfs路径FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));// 输出hdfs路径
//        FileSystem.get(conf).delete(new Path(args[1]), true);FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));// 提交jobSystem.exit(job.waitForCompletion(true) ? 0 : 1);}
}

3.3 执行结果如下所示:

------------------------------------------------
            org
            
------------------------------------------------
hello time
------------------------------------------------
import java
import java
import java
import java
import org
import org
import org
import org1
import org2
------------------------------------------------
import1 org
import1 org
import1 org
import1 org
import1 org
------------------------------------------------
import2 org1
import2 org2
import2 org3
------------------------------------------------
importin org

转https://www.cnblogs.com/minkaihui/p/4125672.html

转载于:https://www.cnblogs.com/likanmama/p/7804949.html

hadoop二次排序相关推荐

  1. hadoop之MapReduce自定义二次排序流程实例详解

    一.概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的.在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求 ...

  2. Hadoop Streaming二次排序

    由于Hadoop机器内存不足,所以需要把数据mapred进来跑. 这样,就需要,同一个key下的输入数据是有序的,即:对于keyA的数据,要求data1先来,之后data2再来--.所以需要对data ...

  3. Hadoop Mapreduce分区、分组、二次排序过程详解

    2019独角兽企业重金招聘Python工程师标准>>> 1.MapReduce中数据流动    (1)最简单的过程:  map - reduce    (2)定制了partition ...

  4. Hadoop Mapreduce分区、分组、二次排序过程详解[转]

    徐海蛟 教学用途 1.MapReduce中数据流动 (1)最简单的过程: map - reduce (2)定制了partitioner以将map的结果送往指定reducer的过程: map - par ...

  5. 大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)

       前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分 ...

  6. 【大数据分析常用算法】1.二次排序

    2019独角兽企业重金招聘Python工程师标准>>> 简介 本算法教程系列建立在您已经有了spark以及Hadoop的开发基础,如果没有的话,请观看本博客的hadoop相关教程或者 ...

  7. 详细讲解MapReduce二次排序过程

    2019独角兽企业重金招聘Python工程师标准>>> 我在15年处理大数据的时候还都是使用MapReduce, 随着时间的推移, 计算工具的发展, 内存越来越便宜, 计算方式也有了 ...

  8. MapReduce二次排序

    2019独角兽企业重金招聘Python工程师标准>>> 默认情况下,Map输出的结果会对Key进行默认的排序,但是有时候需要对Key排序的同时还需要对Value进行排序,这时候就要用 ...

  9. Mapreduce的排序、全排序以及二次排序

    一:背景 Hadoop中虽然有自动排序和分组,由于自带的排序是按照Key进行排序的,有些时候,我们希望同时对Key和Value进行排序.自带的排序功能就无法满足我们了,还好Hadoop提供了一些组件可 ...

最新文章

  1. Git 常用操作(3)- 本地分之显示、创建、切换、合并和删除操作
  2. Codeforces Round #505 D. Recovering BST(区间DP)
  3. 智能导航短信告警的一个逻辑处理
  4. Java注释教程– ULTIMATE指南(PDF下载)
  5. python123程序设计题答案第三周_Python 3 程序设计学习指导与习题解答
  6. iso硬盘安装 凤凰os_凤凰os系统下载_凤凰系统phoenix os官方下载-188软件园
  7. 新趋势下的云计算安全行业前沿认证 | CCSK
  8. CAD图纸打印自动排版
  9. uniapp ios 沙盒测试支付(苹果支付)
  10. Php框架CodeIgniter 学习
  11. ios uri正则表达式_众果搜的博客
  12. Chrome配置Proxy代理
  13. Google Earth Engine(GEE)——注册GEE被拒绝,官网的回复给出答案
  14. opencv之图像平移
  15. C语言试题十之将两个两位数的正整数a b合并形成一个整数放在c中。合并的方式是:将a数的十位和个位数依次放在c的十位和千位上,b数的十位和个位数依次放在c数的个位和百位上。
  16. Python基础知识总结—数据类型,列表,元组,集合,字典
  17. wipe、root、底包、rsd是什么意思?小白入门释义
  18. vartualBox安装oracle数据库
  19. 二叉树的遍历(先序、中序、后序)
  20. 实现抖音视频抖动效果---OpenCV-Python开发指南(53)

热门文章

  1. Absolute C++ Chapter 3 Self-Test Exercise(3)
  2. vue调用顺序(初学版) index.html → main.js → app.vue → index.js → components/组件 测试
  3. javascript+HTML+CSS面试题
  4. 电子书下载:Silverlight 5 in Action
  5. Python判断变量的数据类型的两种方法
  6. sublime text3 package control 报错
  7. debian apt-get联网安装mysql服务
  8. 我的2017年前端之路总结
  9. Hook的两个小插曲
  10. [Unity3d]多个摄像机叠加效果